Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
646e52ae
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
646e52ae
编写于
8月 14, 2023
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into szhou/tag-scan-opt
上级
6688d70b
1990a891
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
461 addition
and
412 deletion
+461
-412
include/libs/wal/wal.h
include/libs/wal/wal.h
+3
-5
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+5
-5
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+0
-174
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+2
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-4
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+2
-2
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+1
-1
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+20
-27
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+32
-46
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-0
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+21
-141
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+2
-1
tests/system-test/7-tmq/tmqMaxTopic.py
tests/system-test/7-tmq/tmqMaxTopic.py
+1
-1
tests/system-test/7-tmq/tmqParamsTest.py
tests/system-test/7-tmq/tmqParamsTest.py
+2
-2
tests/system-test/7-tmq/tmq_offset.py
tests/system-test/7-tmq/tmq_offset.py
+47
-0
tests/system-test/output.txt
tests/system-test/output.txt
+0
-0
utils/test/c/CMakeLists.txt
utils/test/c/CMakeLists.txt
+8
-0
utils/test/c/tmq_offset_test.c
utils/test/c/tmq_offset_test.c
+311
-0
未找到文件。
include/libs/wal/wal.h
浏览文件 @
646e52ae
...
...
@@ -153,7 +153,6 @@ struct SWalReader {
int64_t
capacity
;
TdThreadMutex
mutex
;
SWalFilterCond
cond
;
// TODO remove it
SWalCkHead
*
pHead
;
};
...
...
@@ -207,10 +206,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64
void
walReaderVerifyOffset
(
SWalReader
*
pWalReader
,
STqOffsetVal
*
pOffset
);
// only for tq usage
void
walSetReaderCapacity
(
SWalReader
*
pRead
,
int32_t
capacity
);
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
);
int32_t
walFetchBody
(
SWalReader
*
pRead
,
SWalCkHead
**
ppHead
);
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
,
const
SWalCkHead
*
pHead
);
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
);
int32_t
walFetchBody
(
SWalReader
*
pRead
);
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
);
void
walRefFirstVer
(
SWal
*
,
SWalRef
*
);
void
walRefLastVer
(
SWal
*
,
SWalRef
*
);
...
...
source/client/src/clientTmq.c
浏览文件 @
646e52ae
...
...
@@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return
0
;
}
static
void
updateVgInfo
(
SMqClientVg
*
pVg
,
STqOffsetVal
*
reqOffset
,
STqOffsetVal
*
rspOffset
,
int64_t
sver
,
int64_t
ever
,
int64_t
consumerId
){
static
void
updateVgInfo
(
SMqClientVg
*
pVg
,
STqOffsetVal
*
reqOffset
,
STqOffsetVal
*
rspOffset
,
int64_t
sver
,
int64_t
ever
,
int64_t
consumerId
,
bool
hasData
){
if
(
!
pVg
->
seekUpdated
)
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is update, since seekupdate not set"
,
consumerId
);
pVg
->
offsetInfo
.
beginOffset
=
*
reqOffset
;
if
(
hasData
)
pVg
->
offsetInfo
.
beginOffset
=
*
reqOffset
;
pVg
->
offsetInfo
.
endOffset
=
*
rspOffset
;
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is NOT update, since seekupdate is set"
,
consumerId
);
...
...
@@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg
->
epSet
=
*
pollRspWrapper
->
pEpset
;
}
updateVgInfo
(
pVg
,
&
pDataRsp
->
reqOffset
,
&
pDataRsp
->
rspOffset
,
pDataRsp
->
head
.
walsver
,
pDataRsp
->
head
.
walever
,
tmq
->
consumerId
);
updateVgInfo
(
pVg
,
&
pDataRsp
->
reqOffset
,
&
pDataRsp
->
rspOffset
,
pDataRsp
->
head
.
walsver
,
pDataRsp
->
head
.
walever
,
tmq
->
consumerId
,
pDataRsp
->
blockNum
!=
0
);
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pDataRsp
->
rspOffset
);
...
...
@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return
NULL
;
}
updateVgInfo
(
pVg
,
&
pollRspWrapper
->
metaRsp
.
rspOffset
,
&
pollRspWrapper
->
metaRsp
.
rspOffset
,
pollRspWrapper
->
metaRsp
.
head
.
walsver
,
pollRspWrapper
->
metaRsp
.
head
.
walever
,
tmq
->
consumerId
);
updateVgInfo
(
pVg
,
&
pollRspWrapper
->
metaRsp
.
rspOffset
,
&
pollRspWrapper
->
metaRsp
.
rspOffset
,
pollRspWrapper
->
metaRsp
.
head
.
walsver
,
pollRspWrapper
->
metaRsp
.
head
.
walever
,
tmq
->
consumerId
,
true
);
// build rsp
SMqMetaRspObj
*
pRsp
=
tmqBuildMetaRspFromWrapper
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return
NULL
;
}
updateVgInfo
(
pVg
,
&
pollRspWrapper
->
taosxRsp
.
reqOffset
,
&
pollRspWrapper
->
taosxRsp
.
rspOffset
,
pollRspWrapper
->
taosxRsp
.
head
.
walsver
,
pollRspWrapper
->
taosxRsp
.
head
.
walever
,
tmq
->
consumerId
);
updateVgInfo
(
pVg
,
&
pollRspWrapper
->
taosxRsp
.
reqOffset
,
&
pollRspWrapper
->
taosxRsp
.
rspOffset
,
pollRspWrapper
->
taosxRsp
.
head
.
walsver
,
pollRspWrapper
->
taosxRsp
.
head
.
walever
,
tmq
->
consumerId
,
pollRspWrapper
->
taosxRsp
.
blockNum
!=
0
);
if
(
pollRspWrapper
->
taosxRsp
.
blockNum
==
0
)
{
tscDebug
(
"consumer:0x%"
PRIx64
" taosx empty block received, vgId:%d, vg total:%"
PRId64
", reqId:0x%"
PRIx64
,
...
...
source/client/test/clientTests.cpp
浏览文件 @
646e52ae
...
...
@@ -1442,178 +1442,4 @@ TEST(clientCase, sub_tb_mt_test) {
}
}
TEST
(
clientCase
,
ts_3756
)
{
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"false"
);
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"2000"
);
tmq_conf_set
(
conf
,
"group.id"
,
"group_id_2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"latest"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"false"
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
// 创建订阅 topics 列表
tmq_list_t
*
topicList
=
tmq_list_new
();
tmq_list_append
(
topicList
,
"tp"
);
// 启动订阅
tmq_subscribe
(
tmq
,
topicList
);
tmq_list_destroy
(
topicList
);
TAOS_FIELD
*
fields
=
NULL
;
int32_t
numOfFields
=
0
;
int32_t
precision
=
0
;
int32_t
totalRows
=
0
;
int32_t
msgCnt
=
0
;
int32_t
timeout
=
200
;
int32_t
count
=
0
;
tmq_topic_assignment
*
pAssign
=
NULL
;
int32_t
numOfAssign
=
0
;
int32_t
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign
,
&
numOfAssign
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign
);
tmq_consumer_close
(
tmq
);
taos_close
(
pConn
);
fprintf
(
stderr
,
"%d msg consumed, include %d rows
\n
"
,
msgCnt
,
totalRows
);
return
;
}
for
(
int
i
=
0
;
i
<
numOfAssign
;
i
++
){
printf
(
"assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld
\n
"
,
i
,
pAssign
[
i
].
vgId
,
pAssign
[
i
].
currentOffset
,
pAssign
[
i
].
begin
,
pAssign
[
i
].
end
);
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
tmq_free_assignment
(
pAssign
);
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign
,
&
numOfAssign
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign
);
tmq_consumer_close
(
tmq
);
taos_close
(
pConn
);
fprintf
(
stderr
,
"%d msg consumed, include %d rows
\n
"
,
msgCnt
,
totalRows
);
return
;
}
for
(
int
i
=
0
;
i
<
numOfAssign
;
i
++
){
printf
(
"assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld
\n
"
,
i
,
pAssign
[
i
].
vgId
,
pAssign
[
i
].
currentOffset
,
pAssign
[
i
].
begin
,
pAssign
[
i
].
end
);
}
tmq_free_assignment
(
pAssign
);
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign
,
&
numOfAssign
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign
);
tmq_consumer_close
(
tmq
);
taos_close
(
pConn
);
fprintf
(
stderr
,
"%d msg consumed, include %d rows
\n
"
,
msgCnt
,
totalRows
);
return
;
}
for
(
int
i
=
0
;
i
<
numOfAssign
;
i
++
){
printf
(
"assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld
\n
"
,
i
,
pAssign
[
i
].
vgId
,
pAssign
[
i
].
currentOffset
,
pAssign
[
i
].
begin
,
pAssign
[
i
].
end
);
}
while
(
1
)
{
printf
(
"start to poll
\n
"
);
TAOS_RES
*
pRes
=
tmq_consumer_poll
(
tmq
,
timeout
);
if
(
pRes
)
{
char
buf
[
128
];
const
char
*
topicName
=
tmq_get_topic_name
(
pRes
);
// const char* dbName = tmq_get_db_name(pRes);
// int32_t vgroupId = tmq_get_vgroup_id(pRes);
//
// printf("topic: %s\n", topicName);
// printf("db: %s\n", dbName);
// printf("vgroup id: %d\n", vgroupId);
printSubResults
(
pRes
,
&
totalRows
);
tmq_topic_assignment
*
pAssignTmp
=
NULL
;
int32_t
numOfAssignTmp
=
0
;
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssignTmp
,
&
numOfAssignTmp
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign
);
tmq_consumer_close
(
tmq
);
taos_close
(
pConn
);
fprintf
(
stderr
,
"%d msg consumed, include %d rows
\n
"
,
msgCnt
,
totalRows
);
return
;
}
for
(
int
i
=
0
;
i
<
numOfAssign
;
i
++
){
printf
(
"assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld
\n
"
,
i
,
pAssignTmp
[
i
].
vgId
,
pAssignTmp
[
i
].
currentOffset
,
pAssignTmp
[
i
].
begin
,
pAssignTmp
[
i
].
end
);
}
if
(
numOfAssign
!=
0
){
int
i
=
0
;
for
(;
i
<
numOfAssign
;
i
++
){
if
(
pAssign
[
i
].
currentOffset
!=
pAssignTmp
[
i
].
currentOffset
){
break
;
}
}
if
(
i
==
numOfAssign
){
printf
(
"all position is same
\n
"
);
break
;
}
tmq_free_assignment
(
pAssign
);
}
numOfAssign
=
numOfAssignTmp
;
pAssign
=
pAssignTmp
;
}
else
{
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
// tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
// tmq_commit_sync(tmq, pRes);
continue
;
}
// tmq_commit_sync(tmq, pRes);
if
(
pRes
!=
NULL
)
{
taos_free_result
(
pRes
);
// if ((++count) > 1) {
// break;
// }
}
else
{
// break;
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
}
tmq_free_assignment
(
pAssign
);
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign
,
&
numOfAssign
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign
);
tmq_consumer_close
(
tmq
);
taos_close
(
pConn
);
fprintf
(
stderr
,
"%d msg consumed, include %d rows
\n
"
,
msgCnt
,
totalRows
);
return
;
}
for
(
int
i
=
0
;
i
<
numOfAssign
;
i
++
){
printf
(
"assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld
\n
"
,
i
,
pAssign
[
i
].
vgId
,
pAssign
[
i
].
currentOffset
,
pAssign
[
i
].
begin
,
pAssign
[
i
].
end
);
}
tmq_consumer_close
(
tmq
);
taos_close
(
pConn
);
fprintf
(
stderr
,
"%d msg consumed, include %d rows
\n
"
,
msgCnt
,
totalRows
);
}
#pragma GCC diagnostic pop
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
646e52ae
...
...
@@ -244,7 +244,7 @@ static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, con
SMqRebOutputVg
outputVg
=
{.
oldConsumerId
=
consumerId
,
.
newConsumerId
=
-
1
,
.
pVgEp
=
pVgEp
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s mq re-balance remove vgId:%d from consumer:%"
PRIx64
,
pSubKey
,
pVgEp
->
vgId
,
consumerId
);
mInfo
(
"sub:%s mq re-balance remove vgId:%d from consumer:
0x
%"
PRIx64
,
pSubKey
,
pVgEp
->
vgId
,
consumerId
);
}
taosArrayDestroy
(
pConsumerEp
->
vgs
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
646e52ae
...
...
@@ -127,7 +127,7 @@ void tqDestroyTqHandle(void* data);
// tqRead
int32_t
tqScanTaosx
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
STaosxRsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
offset
);
int32_t
tqScanData
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
);
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
,
uint64_t
reqId
);
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
uint64_t
reqId
);
// tqExec
int32_t
tqTaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SPackedData
submit
,
STaosxRsp
*
pRsp
,
int32_t
*
totalRows
);
...
...
@@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
int32_t
tqExtractDataForMq
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
);
int32_t
tqDoSendDataRsp
(
const
SRpcHandleInfo
*
pRpcHandleInfo
,
const
SMqDataRsp
*
pRsp
,
int32_t
epoch
,
int64_t
consumerId
,
int32_t
type
,
int64_t
sver
,
int64_t
ever
);
int32_t
tqInitDataRsp
(
SMqDataRsp
*
pRsp
,
const
SMqPollReq
*
pReq
);
int32_t
tqInitDataRsp
(
SMqDataRsp
*
pRsp
,
STqOffsetVal
pOffset
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
646e52ae
...
...
@@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
}
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
&
req
);
tqInitDataRsp
(
&
dataRsp
,
req
.
reqOffset
);
dataRsp
.
blockNum
=
0
;
dataRsp
.
rspOffset
=
dataRsp
.
reqOffset
;
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
dataRsp
.
reqOffset
);
tqInfo
(
"tqPushEmptyDataRsp to consumer:0x%"
PRIx64
" vgId:%d, offset:%s, reqId:0x%"
PRIx64
,
req
.
consumerId
,
vgId
,
buf
,
req
.
reqId
);
...
...
@@ -391,7 +390,6 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
}
tqDebug
(
"tmq seek: consumer:0x%"
PRIx64
" vgId:%d, subkey %s"
,
req
.
consumerId
,
vgId
,
req
.
subKey
);
STqHandle
*
pHandle
=
taosHashGet
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
));
if
(
pHandle
==
NULL
)
{
tqWarn
(
"tmq seek: consumer:0x%"
PRIx64
" vgId:%d subkey %s not found"
,
req
.
consumerId
,
vgId
,
req
.
subKey
);
...
...
@@ -719,7 +717,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
walReaderValidVersionRange
(
pHandle
->
execHandle
.
pTqReader
->
pWalReader
,
&
sver
,
&
ever
);
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
&
req
);
tqInitDataRsp
(
&
dataRsp
,
req
.
reqOffset
);
if
(
req
.
useSnapshot
==
true
)
{
tqError
(
"consumer:0x%"
PRIx64
" vgId:%d subkey:%s snapshot not support wal info"
,
consumerId
,
vgId
,
req
.
subKey
);
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
646e52ae
...
...
@@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
if
(
buildHandle
(
pTq
,
handle
)
<
0
){
return
-
1
;
}
tqInfo
(
"
tq restore %s consumer %"
PRId
64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
tqInfo
(
"
restoreHandle %s consumer 0x%"
PRIx
64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
return
taosHashPut
(
pTq
->
pHandle
,
handle
->
subKey
,
strlen
(
handle
->
subKey
),
handle
,
sizeof
(
STqHandle
));
}
...
...
@@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
if
(
buildHandle
(
pTq
,
handle
)
<
0
){
return
-
1
;
}
tqInfo
(
"tq
restore %s consumer %"
PRId
64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
tqInfo
(
"tq
CreateHandle %s consumer 0x%"
PRIx
64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
return
taosHashPut
(
pTq
->
pHandle
,
handle
->
subKey
,
strlen
(
handle
->
subKey
),
handle
,
sizeof
(
STqHandle
));
}
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
646e52ae
...
...
@@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
int32_t
numOfTasks
=
streamMetaGetNumOfTasks
(
pTq
->
pStreamMeta
);
taosRUnLockLatch
(
&
pTq
->
pStreamMeta
->
lock
);
tq
Debug
(
"handle submit, restore:%d, size:%d"
,
pTq
->
pVnode
->
restored
,
numOfTasks
);
tq
Trace
(
"handle submit, restore:%d, size:%d"
,
pTq
->
pVnode
->
restored
,
numOfTasks
);
// push data for stream processing:
// 1. the vnode has already been restored.
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
646e52ae
...
...
@@ -184,70 +184,63 @@ end:
return
tbSuid
==
realTbSuid
;
}
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
ppCkHead
,
uint64_t
reqId
)
{
int32_t
code
=
0
;
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
uint64_t
reqId
)
{
int32_t
code
=
-
1
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
taosThreadMutexLock
(
&
pHandle
->
pWalReader
->
mutex
);
int64_t
offset
=
*
fetchOffset
;
int64_t
lastVer
=
walGetLastVer
(
pHandle
->
pWalReader
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pHandle
->
pWalReader
->
pWal
);
int64_t
appliedVer
=
walGetAppliedVer
(
pHandle
->
pWalReader
->
pWal
);
while
(
1
)
{
if
(
walFetchHead
(
pHandle
->
pWalReader
,
offset
,
*
ppCkHead
)
<
0
)
{
wDebug
(
"vgId:%d, wal start to fetch, index:%"
PRId64
", last index:%"
PRId64
" commit index:%"
PRId64
", applied index:%"
PRId64
,
vgId
,
offset
,
lastVer
,
committedVer
,
appliedVer
);
while
(
offset
<=
appliedVer
)
{
if
(
walFetchHead
(
pHandle
->
pWalReader
,
offset
)
<
0
)
{
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", (epoch %d) vgId:%d offset %"
PRId64
", no more log to return, reqId:0x%"
PRIx64
,
pHandle
->
consumerId
,
pHandle
->
epoch
,
vgId
,
offset
,
reqId
);
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
}
tqDebug
(
"vgId:%d, consumer:0x%"
PRIx64
" taosx get msg ver %"
PRId64
", type: %s, reqId:0x%"
PRIx64
,
vgId
,
pHandle
->
consumerId
,
offset
,
TMSG_INFO
(
(
*
ppCkHead
)
->
head
.
msgType
),
reqId
);
pHandle
->
consumerId
,
offset
,
TMSG_INFO
(
pHandle
->
pWalReader
->
pHead
->
head
.
msgType
),
reqId
);
if
((
*
ppCkHead
)
->
head
.
msgType
==
TDMT_VND_SUBMIT
)
{
code
=
walFetchBody
(
pHandle
->
pWalReader
,
ppCkHead
);
if
(
code
<
0
)
{
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
}
*
fetchOffset
=
offset
;
code
=
0
;
if
(
pHandle
->
pWalReader
->
pHead
->
head
.
msgType
==
TDMT_VND_SUBMIT
)
{
code
=
walFetchBody
(
pHandle
->
pWalReader
);
goto
END
;
}
else
{
if
(
pHandle
->
fetchMeta
!=
WITH_DATA
)
{
SWalCont
*
pHead
=
&
(
(
*
ppCkHead
)
->
head
);
SWalCont
*
pHead
=
&
(
pHandle
->
pWalReader
->
pHead
->
head
);
if
(
IS_META_MSG
(
pHead
->
msgType
)
&&
!
(
pHead
->
msgType
==
TDMT_VND_DELETE
&&
pHandle
->
fetchMeta
==
ONLY_META
))
{
code
=
walFetchBody
(
pHandle
->
pWalReader
,
ppCkHead
);
code
=
walFetchBody
(
pHandle
->
pWalReader
);
if
(
code
<
0
)
{
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
}
pHead
=
&
(
pHandle
->
pWalReader
->
pHead
->
head
);
if
(
isValValidForTable
(
pHandle
,
pHead
))
{
*
fetchOffset
=
offset
;
code
=
0
;
goto
END
;
}
else
{
offset
++
;
code
=
-
1
;
continue
;
}
}
}
code
=
walSkipFetchBody
(
pHandle
->
pWalReader
,
*
ppCkHead
);
code
=
walSkipFetchBody
(
pHandle
->
pWalReader
);
if
(
code
<
0
)
{
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
}
offset
++
;
}
code
=
-
1
;
}
END:
taosThreadMutexUnlock
(
&
pHandle
->
pWalReader
->
mutex
)
;
*
fetchOffset
=
offset
;
return
code
;
}
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
646e52ae
...
...
@@ -20,8 +20,9 @@
static
int32_t
tqSendMetaPollRsp
(
STqHandle
*
pHandle
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqMetaRsp
*
pRsp
,
int32_t
vgId
);
int32_t
tqInitDataRsp
(
SMqDataRsp
*
pRsp
,
const
SMqPollReq
*
pReq
)
{
pRsp
->
reqOffset
=
pReq
->
reqOffset
;
int32_t
tqInitDataRsp
(
SMqDataRsp
*
pRsp
,
STqOffsetVal
pOffset
)
{
pRsp
->
reqOffset
=
pOffset
;
pRsp
->
rspOffset
=
pOffset
;
pRsp
->
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
pRsp
->
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
...
...
@@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
return
0
;
}
static
int32_t
tqInitTaosxRsp
(
STaosxRsp
*
pRsp
,
const
SMqPollReq
*
pReq
)
{
pRsp
->
reqOffset
=
pReq
->
reqOffset
;
static
int32_t
tqInitTaosxRsp
(
STaosxRsp
*
pRsp
,
STqOffsetVal
pOffset
)
{
pRsp
->
reqOffset
=
pOffset
;
pRsp
->
rspOffset
=
pOffset
;
pRsp
->
withTbName
=
1
;
pRsp
->
withSchema
=
1
;
...
...
@@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
static
int32_t
extractResetOffsetVal
(
STqOffsetVal
*
pOffsetVal
,
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
,
bool
*
pBlockReturned
)
{
uint64_t
consumerId
=
pRequest
->
consumerId
;
STqOffsetVal
reqOffset
=
pRequest
->
reqOffset
;
STqOffset
*
pOffset
=
tqOffsetRead
(
pTq
->
pOffsetStore
,
pRequest
->
subKey
);
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
...
...
@@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return
0
;
}
else
{
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
if
(
pRequest
->
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEST
)
{
if
(
pRequest
->
useSnapshot
)
{
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey:%s, vgId:%d, (earliest) set offset to be snapshot"
,
consumerId
,
pHandle
->
subKey
,
vgId
);
...
...
@@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
walRefFirstVer
(
pTq
->
pVnode
->
pWal
,
pHandle
->
pRef
);
tqOffsetResetToLog
(
pOffsetVal
,
pHandle
->
pRef
->
refVer
);
}
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
}
else
if
(
pRequest
->
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
walRefLastVer
(
pTq
->
pVnode
->
pWal
,
pHandle
->
pRef
);
SMqDataRsp
dataRsp
=
{
0
};
tq
InitDataRsp
(
&
dataRsp
,
pRequest
);
tq
OffsetResetToLog
(
pOffsetVal
,
pHandle
->
pRef
->
refVer
+
1
);
tq
OffsetResetToLog
(
&
dataRsp
.
rspOffset
,
pHandle
->
pRef
->
refVer
+
1
);
tq
InitDataRsp
(
&
dataRsp
,
*
pOffsetVal
);
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey %s, vgId:%d, (latest) offset reset to %"
PRId64
,
consumerId
,
pHandle
->
subKey
,
vgId
,
dataRsp
.
rspOffset
.
version
);
int32_t
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
&
dataRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
...
...
@@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
*
pBlockReturned
=
true
;
return
code
;
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_NONE
)
{
}
else
if
(
pRequest
->
reqOffset
.
type
==
TMQ_OFFSET__RESET_NONE
)
{
tqError
(
"tmq poll: subkey:%s, no offset committed for consumer:0x%"
PRIx64
" in vg %d, subkey %s, reset none failed"
,
pHandle
->
subKey
,
consumerId
,
vgId
,
pRequest
->
subKey
);
...
...
@@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return
0
;
}
static
void
setRequestVersion
(
STqOffsetVal
*
offset
,
int64_t
ver
){
if
(
offset
->
type
==
TMQ_OFFSET__LOG
){
offset
->
version
=
ver
+
1
;
}
}
//
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
//
if(offset->type == TMQ_OFFSET__LOG){
// offset->version = ver
;
//
}
//
}
static
int32_t
extractDataAndRspForNormalSubscribe
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
,
STqOffsetVal
*
pOffset
)
{
...
...
@@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno
=
0
;
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReques
t
);
dataRsp
.
reqOffset
.
type
=
pOffset
->
type
;
// stro
e origin type for getting offset in tmq_get_vgroup_offset
tqInitDataRsp
(
&
dataRsp
,
*
pOffse
t
);
// dataRsp.reqOffset.type = pOffset->type; // stor
e origin type for getting offset in tmq_get_vgroup_offset
qSetTaskId
(
pHandle
->
execHandle
.
task
,
consumerId
,
pRequest
->
reqId
);
int
code
=
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
pOffset
);
...
...
@@ -152,8 +153,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// lock
taosWLockLatch
(
&
pTq
->
lock
);
int64_t
ver
=
walGetCommittedVer
(
pTq
->
pVnode
->
pWal
);
if
(
pOffset
->
version
>=
ver
||
dataRsp
.
rspOffset
.
version
>=
ver
)
{
// check if there are data again to avoid lost data
if
(
dataRsp
.
rspOffset
.
version
>
ver
)
{
// check if there are data again to avoid lost data
code
=
tqRegisterPushHandle
(
pTq
,
pHandle
,
pMsg
);
taosWUnLockLatch
(
&
pTq
->
lock
);
goto
end
;
...
...
@@ -161,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch
(
&
pTq
->
lock
);
}
setRequestVersion
(
&
dataRsp
.
reqOffset
,
pOffset
->
version
);
//
setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
dataRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
end
:
{
...
...
@@ -179,11 +179,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SRpcMsg
*
pMsg
,
STqOffsetVal
*
offset
)
{
int
code
=
0
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
SWalCkHead
*
pCkHead
=
NULL
;
SMqMetaRsp
metaRsp
=
{
0
};
STaosxRsp
taosxRsp
=
{
0
};
tqInitTaosxRsp
(
&
taosxRsp
,
pReques
t
);
taosxRsp
.
reqOffset
.
type
=
offset
->
type
;
// store origin type for getting offset in tmq_get_vgroup_offset
tqInitTaosxRsp
(
&
taosxRsp
,
*
offse
t
);
//
taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
if
(
offset
->
type
!=
TMQ_OFFSET__LOG
)
{
if
(
tqScanTaosx
(
pTq
,
pHandle
,
&
taosxRsp
,
&
metaRsp
,
offset
)
<
0
)
{
...
...
@@ -216,14 +215,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if
(
offset
->
type
==
TMQ_OFFSET__LOG
)
{
walReaderVerifyOffset
(
pHandle
->
pWalReader
,
offset
);
int64_t
fetchVer
=
offset
->
version
;
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
if
(
pCkHead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
end
;
}
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
int
totalRows
=
0
;
while
(
1
)
{
int32_t
savedEpoch
=
atomic_load_32
(
&
pHandle
->
epoch
);
...
...
@@ -234,14 +226,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
break
;
}
if
(
tqFetchLog
(
pTq
,
pHandle
,
&
fetchVer
,
&
pCkHead
,
pRequest
->
reqId
)
<
0
)
{
if
(
tqFetchLog
(
pTq
,
pHandle
,
&
fetchVer
,
pRequest
->
reqId
)
<
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
setRequestVersion
(
&
taosxRsp
.
reqOffset
,
offset
->
version
);
//
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
goto
end
;
}
SWalCont
*
pHead
=
&
p
Ck
Head
->
head
;
SWalCont
*
pHead
=
&
p
Handle
->
pWalReader
->
p
Head
->
head
;
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
" (epoch %d) iter log, vgId:%d offset %"
PRId64
" msgType %d"
,
pRequest
->
consumerId
,
pRequest
->
epoch
,
vgId
,
fetchVer
,
pHead
->
msgType
);
...
...
@@ -249,7 +241,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if
(
pHead
->
msgType
!=
TDMT_VND_SUBMIT
)
{
if
(
totalRows
>
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
setRequestVersion
(
&
taosxRsp
.
reqOffset
,
offset
->
version
);
//
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
goto
end
;
}
...
...
@@ -279,7 +271,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if
(
totalRows
>=
4096
||
taosxRsp
.
createTableNum
>
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
+
1
);
setRequestVersion
(
&
taosxRsp
.
reqOffset
,
offset
->
version
);
//
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
taosxRsp
.
createTableNum
>
0
?
TMQ_MSG_TYPE__POLL_DATA_META_RSP
:
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
goto
end
;
}
else
{
...
...
@@ -291,20 +283,17 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
end:
tDeleteSTaosxRsp
(
&
taosxRsp
);
taosMemoryFreeClear
(
pCkHead
);
return
code
;
}
int32_t
tqExtractDataForMq
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
-
1
;
STqOffsetVal
offset
=
{
0
};
STqOffsetVal
reqOffset
=
pRequest
->
reqOffset
;
// 1. reset the offset if needed
if
(
IS_OFFSET_RESET_TYPE
(
reqOffset
.
type
))
{
if
(
IS_OFFSET_RESET_TYPE
(
pRequest
->
reqOffset
.
type
))
{
// handle the reset offset cases, according to the consumer's choice.
bool
blockReturned
=
false
;
code
=
extractResetOffsetVal
(
&
o
ffset
,
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
blockReturned
);
int32_t
code
=
extractResetOffsetVal
(
&
reqO
ffset
,
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
blockReturned
);
if
(
code
!=
0
)
{
return
code
;
}
...
...
@@ -313,20 +302,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
if
(
blockReturned
)
{
return
0
;
}
}
else
if
(
reqOffset
.
type
!=
0
){
// use the consumer specified offset
// the offset value can not be monotonious increase??
offset
=
reqOffset
;
}
else
{
}
else
if
(
reqOffset
.
type
==
0
){
// use the consumer specified offset
uError
(
"req offset type is 0"
);
return
TSDB_CODE_TMQ_INVALID_MSG
;
}
// this is a normal subscribe requirement
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
return
extractDataAndRspForNormalSubscribe
(
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
o
ffset
);
return
extractDataAndRspForNormalSubscribe
(
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
reqO
ffset
);
}
else
{
// todo handle the case where re-balance occurs.
// for taosx
return
extractDataAndRspForDbStbSubscribe
(
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
o
ffset
);
return
extractDataAndRspForDbStbSubscribe
(
pTq
,
pHandle
,
pRequest
,
pMsg
,
&
reqO
ffset
);
}
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
646e52ae
...
...
@@ -628,6 +628,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return
tqProcessVgCommittedInfoReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_VND_TMQ_SEEK
:
return
tqProcessSeekReq
(
pVnode
->
pTq
,
pMsg
);
default:
vError
(
"unknown msg type:%d in fetch queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_APP_ERROR
;
...
...
source/libs/wal/src/walRead.c
浏览文件 @
646e52ae
...
...
@@ -16,10 +16,6 @@
#include "taoserror.h"
#include "walInt.h"
static
int32_t
walFetchHeadNew
(
SWalReader
*
pRead
,
int64_t
fetchVer
);
static
int32_t
walFetchBodyNew
(
SWalReader
*
pRead
);
static
int32_t
walSkipFetchBodyNew
(
SWalReader
*
pRead
);
SWalReader
*
walOpenReader
(
SWal
*
pWal
,
SWalFilterCond
*
cond
)
{
SWalReader
*
pReader
=
taosMemoryCalloc
(
1
,
sizeof
(
SWalReader
));
if
(
pReader
==
NULL
)
{
...
...
@@ -70,38 +66,29 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t
fetchVer
=
pReader
->
curVersion
;
int64_t
lastVer
=
walGetLastVer
(
pReader
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pReader
->
pWal
);
// int64_t appliedVer = walGetAppliedVer(pReader->pWal);
// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
// }
// int64_t endVer = TMIN(appliedVer, committedVer);
int64_t
endVer
=
committedVer
;
int64_t
appliedVer
=
walGetAppliedVer
(
pReader
->
pWal
);
wDebug
(
"vgId:%d, wal start to fetch, index:%"
PRId64
", last index:%"
PRId64
" commit index:%"
PRId64
", end index:%"
PRId64
,
pReader
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
endVer
);
if
(
fetchVer
>
endVer
){
", applied index:%"
PRId64
,
pReader
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
);
if
(
fetchVer
>
appliedVer
){
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
while
(
fetchVer
<=
endVer
)
{
if
(
walFetchHeadNew
(
pReader
,
fetchVer
)
<
0
)
{
while
(
fetchVer
<=
appliedVer
)
{
if
(
walFetchHead
(
pReader
,
fetchVer
)
<
0
)
{
return
-
1
;
}
int32_t
type
=
pReader
->
pHead
->
head
.
msgType
;
if
(
type
==
TDMT_VND_SUBMIT
||
((
type
==
TDMT_VND_DELETE
)
&&
(
pReader
->
cond
.
deleteMsg
==
1
))
||
(
IS_META_MSG
(
type
)
&&
pReader
->
cond
.
scanMeta
))
{
if
(
walFetchBody
New
(
pReader
)
<
0
)
{
if
(
walFetchBody
(
pReader
)
<
0
)
{
return
-
1
;
}
return
0
;
}
else
{
if
(
walSkipFetchBody
New
(
pReader
)
<
0
)
{
if
(
walSkipFetchBody
(
pReader
)
<
0
)
{
return
-
1
;
}
...
...
@@ -263,104 +250,8 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
return
0
;
}
void
walSetReaderCapacity
(
SWalReader
*
pRead
,
int32_t
capacity
)
{
pRead
->
capacity
=
capacity
;
}
static
int32_t
walFetchHeadNew
(
SWalReader
*
pRead
,
int64_t
fetchVer
)
{
int64_t
contLen
;
bool
seeked
=
false
;
wDebug
(
"vgId:%d, wal starts to fetch head, index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
fetchVer
);
if
(
pRead
->
curVersion
!=
fetchVer
)
{
if
(
walReaderSeekVer
(
pRead
,
fetchVer
)
<
0
)
{
return
-
1
;
}
seeked
=
true
;
}
while
(
1
)
{
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
pRead
->
pHead
,
sizeof
(
SWalCkHead
));
if
(
contLen
==
sizeof
(
SWalCkHead
))
{
break
;
}
else
if
(
contLen
==
0
&&
!
seeked
)
{
if
(
walReadSeekVerImpl
(
pRead
,
fetchVer
)
<
0
){
return
-
1
;
}
seeked
=
true
;
continue
;
}
else
{
if
(
contLen
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
return
-
1
;
}
}
// pRead->curInvalid = 0;
return
0
;
}
static
int32_t
walFetchBodyNew
(
SWalReader
*
pReader
)
{
SWalCont
*
pReadHead
=
&
pReader
->
pHead
->
head
;
int64_t
ver
=
pReadHead
->
version
;
wDebug
(
"vgId:%d, wal starts to fetch body, ver:%"
PRId64
" ,len:%d, total"
,
pReader
->
pWal
->
cfg
.
vgId
,
ver
,
pReadHead
->
bodyLen
);
if
(
pReader
->
capacity
<
pReadHead
->
bodyLen
)
{
SWalCkHead
*
ptr
=
(
SWalCkHead
*
)
taosMemoryRealloc
(
pReader
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pReader
->
pHead
=
ptr
;
pReadHead
=
&
pReader
->
pHead
->
head
;
pReader
->
capacity
=
pReadHead
->
bodyLen
;
}
if
(
pReadHead
->
bodyLen
!=
taosReadFile
(
pReader
->
pLogFile
,
pReadHead
->
body
,
pReadHead
->
bodyLen
))
{
if
(
pReadHead
->
bodyLen
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", read request index:%"
PRId64
", since %s"
,
pReader
->
pWal
->
cfg
.
vgId
,
pReader
->
pHead
->
head
.
version
,
ver
,
tstrerror
(
terrno
));
}
else
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", read request index:%"
PRId64
", since file corrupted"
,
pReader
->
pWal
->
cfg
.
vgId
,
pReader
->
pHead
->
head
.
version
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
return
-
1
;
}
if
(
walValidBodyCksum
(
pReader
->
pHead
)
!=
0
)
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", since body checksum not passed"
,
pReader
->
pWal
->
cfg
.
vgId
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
wDebug
(
"vgId:%d, index:%"
PRId64
" is fetched, type:%d, cursor advance"
,
pReader
->
pWal
->
cfg
.
vgId
,
ver
,
pReader
->
pHead
->
head
.
msgType
);
pReader
->
curVersion
=
ver
+
1
;
return
0
;
}
static
int32_t
walSkipFetchBodyNew
(
SWalReader
*
pRead
)
{
int64_t
code
;
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pRead
->
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
// pRead->curInvalid = 1;
return
-
1
;
}
pRead
->
curVersion
++
;
wDebug
(
"vgId:%d, version advance to %"
PRId64
", skip fetch"
,
pRead
->
pWal
->
cfg
.
vgId
,
pRead
->
curVersion
);
return
0
;
}
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
)
{
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
)
{
int64_t
code
;
int64_t
contLen
;
bool
seeked
=
false
;
...
...
@@ -378,15 +269,13 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
if
(
pRead
->
curVersion
!=
ver
)
{
code
=
walReaderSeekVer
(
pRead
,
ver
);
if
(
code
<
0
)
{
// pRead->curVersion = ver;
// pRead->curInvalid = 1;
return
-
1
;
}
seeked
=
true
;
}
while
(
1
)
{
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
pHead
,
sizeof
(
SWalCkHead
));
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
p
Read
->
p
Head
,
sizeof
(
SWalCkHead
));
if
(
contLen
==
sizeof
(
SWalCkHead
))
{
break
;
}
else
if
(
contLen
==
0
&&
!
seeked
)
{
...
...
@@ -401,12 +290,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
// pRead->curInvalid = 1;
return
-
1
;
}
}
code
=
walValidHeadCksum
(
pHead
);
code
=
walValidHeadCksum
(
p
Read
->
p
Head
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
...
...
@@ -414,32 +302,27 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return
-
1
;
}
// pRead->curInvalid = 0;
return
0
;
}
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
,
const
SWalCkHead
*
pHead
)
{
int64_t
code
;
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
)
{
wDebug
(
"vgId:%d, skip fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
", applied ver:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pHead
->
head
.
version
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
cfg
.
vgId
,
p
Read
->
p
Head
->
head
.
version
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
vers
.
lastVer
,
pRead
->
pWal
->
vers
.
appliedVer
);
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
int64_t
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pRead
->
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
// pRead->curInvalid = 1;
return
-
1
;
}
pRead
->
curVersion
++
;
return
0
;
}
int32_t
walFetchBody
(
SWalReader
*
pRead
,
SWalCkHead
**
ppHead
)
{
SWalCont
*
pReadHead
=
&
((
*
ppHead
)
->
head
)
;
int32_t
walFetchBody
(
SWalReader
*
pRead
)
{
SWalCont
*
pReadHead
=
&
pRead
->
pHead
->
head
;
int64_t
ver
=
pReadHead
->
version
;
wDebug
(
"vgId:%d, fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
...
...
@@ -448,13 +331,13 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead
->
pWal
->
vers
.
appliedVer
);
if
(
pRead
->
capacity
<
pReadHead
->
bodyLen
)
{
SWalCkHead
*
ptr
=
(
SWalCkHead
*
)
taosMemoryRealloc
(
*
p
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
SWalCkHead
*
ptr
=
(
SWalCkHead
*
)
taosMemoryRealloc
(
pRead
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
*
p
pHead
=
ptr
;
pReadHead
=
&
((
*
ppHead
)
->
head
)
;
pRead
->
pHead
=
ptr
;
pReadHead
=
&
pRead
->
pHead
->
head
;
pRead
->
capacity
=
pReadHead
->
bodyLen
;
}
...
...
@@ -468,27 +351,24 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead
->
pWal
->
cfg
.
vgId
,
pReadHead
->
version
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
// pRead->curInvalid = 1;
return
-
1
;
}
if
(
pReadHead
->
version
!=
ver
)
{
wError
(
"vgId:%d, wal fetch body error, index:%"
PRId64
", read request index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pReadHead
->
version
,
ver
);
// pRead->curInvalid = 1;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
if
(
walValidBodyCksum
(
*
p
pHead
)
!=
0
)
{
if
(
walValidBodyCksum
(
pRead
->
pHead
)
!=
0
)
{
wError
(
"vgId:%d, wal fetch body error, index:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
// pRead->curInvalid = 1;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
pRead
->
curVersion
=
ver
+
1
;
pRead
->
curVersion
++
;
return
0
;
}
...
...
tests/parallel_test/cases.task
浏览文件 @
646e52ae
...
...
@@ -127,6 +127,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
...
...
@@ -455,7 +456,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
#
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
...
...
tests/system-test/7-tmq/tmqMaxTopic.py
浏览文件 @
646e52ae
...
...
@@ -36,7 +36,7 @@ class TDTestCase:
# tdDnodes[1].cfgDir
cfgFile
=
f
"%s/taos.cfg"
%
(
cfgDir
)
shellCmd
=
'echo
"tmqMaxTopicNum %d"
>> %s'
%
(
tmqMaxTopicNum
,
cfgFile
)
shellCmd
=
'echo
tmqMaxTopicNum %d
>> %s'
%
(
tmqMaxTopicNum
,
cfgFile
)
tdLog
.
info
(
" shell cmd: %s"
%
(
shellCmd
))
os
.
system
(
shellCmd
)
tdDnodes
.
stoptaosd
(
1
)
...
...
tests/system-test/7-tmq/tmqParamsTest.py
浏览文件 @
646e52ae
...
...
@@ -131,7 +131,7 @@ class TDTestCase:
if
snapshot_value
==
"true"
:
if
offset_value
!=
"earliest"
and
offset_value
!=
""
:
if
offset_value
==
"latest"
:
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"wal:"
,
""
).
replace
(
offset_value
,
"0"
)),
subscription_info
))
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"wal:"
,
""
).
replace
(
"earliest"
,
"0"
).
replace
(
"latest"
,
"0"
).
replace
(
offset_value
,
"0"
)),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
offset_value_list
)
>=
0
,
True
)
rows_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
1
]),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
rows_value_list
),
expected_res
)
...
...
@@ -154,7 +154,7 @@ class TDTestCase:
tdSql
.
checkEqual
(
rows_value_list
,
[
None
]
*
len
(
subscription_info
))
else
:
if
offset_value
!=
"none"
:
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"wal:"
,
""
).
replace
(
offset_value
,
"0"
)),
subscription_info
))
offset_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
2
].
replace
(
"wal:"
,
""
).
replace
(
"earliest"
,
"0"
).
replace
(
"latest"
,
"0"
).
replace
(
offset_value
,
"0"
)),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
offset_value_list
)
>=
0
,
True
)
rows_value_list
=
list
(
map
(
lambda
x
:
int
(
x
[
-
1
]),
subscription_info
))
tdSql
.
checkEqual
(
sum
(
rows_value_list
),
expected_res
)
...
...
tests/system-test/7-tmq/tmq_offset.py
0 → 100644
浏览文件 @
646e52ae
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
def
run
(
self
):
tdSql
.
prepare
()
buildPath
=
tdCom
.
getBuildPath
()
cmdStr1
=
'%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'
%
(
buildPath
)
tdLog
.
info
(
cmdStr1
)
os
.
system
(
cmdStr1
)
time
.
sleep
(
15
)
cmdStr2
=
'%s/build/bin/tmq_offset_test &'
%
(
buildPath
)
tdLog
.
info
(
cmdStr2
)
os
.
system
(
cmdStr2
)
time
.
sleep
(
20
)
os
.
system
(
"kill -9 `pgrep taosBenchmark`"
)
result
=
os
.
system
(
"kill -9 `pgrep tmq_offset_test`"
)
if
result
!=
0
:
tdLog
.
exit
(
"tmq_offset_test error!"
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/output.txt
0 → 100644
浏览文件 @
646e52ae
utils/test/c/CMakeLists.txt
浏览文件 @
646e52ae
...
...
@@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c)
add_executable
(
sml_test sml_test.c
)
add_executable
(
get_db_name_test get_db_name_test.c
)
add_executable
(
tmq_offset tmqOffset.c
)
add_executable
(
tmq_offset_test tmq_offset_test.c
)
target_link_libraries
(
tmq_offset
PUBLIC taos
...
...
@@ -42,6 +43,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries
(
tmq_offset_test
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries
(
write_raw_block_test
...
...
utils/test/c/tmq_offset_test.c
0 → 100644
浏览文件 @
646e52ae
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
int
buildData
(
TAOS
*
pConn
){
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop topic if exists tp"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop tp, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop database if exists db_ts3756"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_ts3756 vgroups 2 wal_retention_period 3600"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use db_ts3756"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create table meters, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into t1 values(now, 1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into t1 values(now + 1s, 2)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic tp as select * from t1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic tp, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
return
0
;
}
void
test_offset
(
TAOS
*
pConn
){
if
(
buildData
(
pConn
)
!=
0
){
ASSERT
(
0
);
}
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"false"
);
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"2000"
);
tmq_conf_set
(
conf
,
"group.id"
,
"group_id_2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"false"
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
// 创建订阅 topics 列表
tmq_list_t
*
topicList
=
tmq_list_new
();
tmq_list_append
(
topicList
,
"tp"
);
// 启动订阅
tmq_subscribe
(
tmq
,
topicList
);
tmq_list_destroy
(
topicList
);
int32_t
timeout
=
200
;
tmq_topic_assignment
*
pAssign1
=
NULL
;
int32_t
numOfAssign1
=
0
;
tmq_topic_assignment
*
pAssign2
=
NULL
;
int32_t
numOfAssign2
=
0
;
tmq_topic_assignment
*
pAssign3
=
NULL
;
int32_t
numOfAssign3
=
0
;
int32_t
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign1
,
&
numOfAssign1
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign1
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
}
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign2
,
&
numOfAssign2
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign2
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
}
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign3
,
&
numOfAssign3
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign3
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
return
;
}
ASSERT
(
numOfAssign1
==
2
);
ASSERT
(
numOfAssign1
==
numOfAssign2
);
ASSERT
(
numOfAssign1
==
numOfAssign3
);
for
(
int
i
=
0
;
i
<
numOfAssign1
;
i
++
){
int
j
=
0
;
int
k
=
0
;
for
(;
j
<
numOfAssign2
;
j
++
){
if
(
pAssign1
[
i
].
vgId
==
pAssign2
[
j
].
vgId
){
break
;
}
}
for
(;
k
<
numOfAssign3
;
k
++
){
if
(
pAssign1
[
i
].
vgId
==
pAssign3
[
k
].
vgId
){
break
;
}
}
ASSERT
(
pAssign1
[
i
].
currentOffset
==
pAssign2
[
j
].
currentOffset
);
ASSERT
(
pAssign1
[
i
].
currentOffset
==
pAssign3
[
k
].
currentOffset
);
ASSERT
(
pAssign1
[
i
].
begin
==
pAssign2
[
j
].
begin
);
ASSERT
(
pAssign1
[
i
].
begin
==
pAssign3
[
k
].
begin
);
ASSERT
(
pAssign1
[
i
].
end
==
pAssign2
[
j
].
end
);
ASSERT
(
pAssign1
[
i
].
end
==
pAssign3
[
k
].
end
);
}
tmq_free_assignment
(
pAssign1
);
tmq_free_assignment
(
pAssign2
);
tmq_free_assignment
(
pAssign3
);
int
cnt
=
0
;
int
offset1
=
-
1
;
int
offset2
=
-
1
;
while
(
cnt
++
<
10
)
{
printf
(
"start to poll:%d
\n
"
,
cnt
);
TAOS_RES
*
pRes
=
tmq_consumer_poll
(
tmq
,
timeout
);
if
(
pRes
)
{
tmq_topic_assignment
*
pAssign
=
NULL
;
int32_t
numOfAssign
=
0
;
code
=
tmq_get_topic_assignment
(
tmq
,
"tp"
,
&
pAssign
,
&
numOfAssign
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
}
for
(
int
i
=
0
;
i
<
numOfAssign
;
i
++
){
int64_t
position
=
tmq_position
(
tmq
,
"tp"
,
pAssign
[
i
].
vgId
);
if
(
position
==
0
)
continue
;
printf
(
"position = %d
\n
"
,
(
int
)
position
);
tmq_commit_offset_sync
(
tmq
,
"tp"
,
pAssign
[
i
].
vgId
,
position
);
int64_t
committed
=
tmq_committed
(
tmq
,
"tp"
,
pAssign
[
i
].
vgId
);
ASSERT
(
position
==
committed
);
}
tmq_offset_seek
(
tmq
,
"tp"
,
pAssign
[
0
].
vgId
,
pAssign
[
0
].
currentOffset
);
tmq_offset_seek
(
tmq
,
"tp"
,
pAssign
[
1
].
vgId
,
pAssign
[
1
].
currentOffset
);
if
(
offset1
!=
-
1
){
ASSERT
(
offset1
==
pAssign
[
0
].
currentOffset
);
}
if
(
offset2
!=
-
1
){
ASSERT
(
offset2
==
pAssign
[
1
].
currentOffset
);
}
offset1
=
pAssign
[
0
].
currentOffset
;
offset2
=
pAssign
[
1
].
currentOffset
;
tmq_free_assignment
(
pAssign
);
taos_free_result
(
pRes
);
}
}
tmq_consumer_close
(
tmq
);
}
// run taosBenchmark first
void
test_ts3756
(
TAOS
*
pConn
){
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use test"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
ASSERT
(
0
);
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop topic if exists t1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
ASSERT
(
0
);
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic t1 as select * from meters"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
ASSERT
(
0
);
}
taos_free_result
(
pRes
);
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"false"
);
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"2000"
);
tmq_conf_set
(
conf
,
"group.id"
,
"group_id_2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"latest"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"false"
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
// 创建订阅 topics 列表
tmq_list_t
*
topicList
=
tmq_list_new
();
tmq_list_append
(
topicList
,
"t1"
);
// 启动订阅
tmq_subscribe
(
tmq
,
topicList
);
tmq_list_destroy
(
topicList
);
int32_t
timeout
=
200
;
tmq_topic_assignment
*
pAssign
=
NULL
;
int32_t
numOfAssign
=
0
;
while
(
1
)
{
// printf("start to poll\n");
pRes
=
tmq_consumer_poll
(
tmq
,
timeout
);
if
(
pRes
)
{
tmq_topic_assignment
*
pAssignTmp
=
NULL
;
int32_t
numOfAssignTmp
=
0
;
int32_t
code
=
tmq_get_topic_assignment
(
tmq
,
"t1"
,
&
pAssignTmp
,
&
numOfAssignTmp
);
if
(
code
!=
0
)
{
printf
(
"error occurs:%s
\n
"
,
tmq_err2str
(
code
));
tmq_free_assignment
(
pAssign
);
tmq_consumer_close
(
tmq
);
ASSERT
(
0
);
}
if
(
numOfAssign
!=
0
){
int
i
=
0
;
for
(;
i
<
numOfAssign
;
i
++
){
if
(
pAssign
[
i
].
currentOffset
!=
pAssignTmp
[
i
].
currentOffset
){
break
;
}
}
if
(
i
==
numOfAssign
){
ASSERT
(
0
);
}
tmq_free_assignment
(
pAssign
);
}
numOfAssign
=
numOfAssignTmp
;
pAssign
=
pAssignTmp
;
taos_free_result
(
pRes
);
}
}
tmq_free_assignment
(
pAssign
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
test_offset
(
pConn
);
test_ts3756
(
pConn
);
taos_close
(
pConn
);
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录