Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6b5daf6c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6b5daf6c
编写于
7月 04, 2023
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix:add lock for tmq->clientTopic
上级
35a35ad4
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
100 addition
and
62 deletion
+100
-62
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+100
-62
未找到文件。
source/client/src/clientTmq.c
浏览文件 @
6b5daf6c
...
...
@@ -636,6 +636,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
pParamSet
->
callbackFn
=
pCommitFp
;
pParamSet
->
userParam
=
userParam
;
taosRLockLatch
(
&
tmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" do manual commit offset for %s, vgId:%d"
,
tmq
->
consumerId
,
pTopicName
,
vgId
);
...
...
@@ -646,6 +647,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
pTopicName
,
numOfTopics
);
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
TSDB_CODE_SUCCESS
,
userParam
);
taosRUnLockLatch
(
&
tmq
->
lock
);
return
;
}
...
...
@@ -663,6 +665,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
vgId
,
numOfVgroups
,
pTopicName
);
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
TSDB_CODE_SUCCESS
,
userParam
);
taosRUnLockLatch
(
&
tmq
->
lock
);
return
;
}
...
...
@@ -679,6 +682,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
code
,
userParam
);
}
taosRUnLockLatch
(
&
tmq
->
lock
);
}
static
void
asyncCommitAllOffsets
(
tmq_t
*
tmq
,
tmq_commit_cb
*
pCommitFp
,
void
*
userParam
)
{
...
...
@@ -696,6 +700,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
// init as 1 to prevent concurrency issue
pParamSet
->
waitingRspNum
=
1
;
taosRLockLatch
(
&
tmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" start to commit offset for %d topics"
,
tmq
->
consumerId
,
numOfTopics
);
...
...
@@ -725,6 +730,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
}
}
}
taosRUnLockLatch
(
&
tmq
->
lock
);
tscDebug
(
"consumer:0x%"
PRIx64
" total commit:%d for %d topics"
,
tmq
->
consumerId
,
pParamSet
->
waitingRspNum
-
1
,
numOfTopics
);
...
...
@@ -799,6 +805,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SMqHbReq
req
=
{
0
};
req
.
consumerId
=
tmq
->
consumerId
;
req
.
epoch
=
tmq
->
epoch
;
taosRLockLatch
(
&
tmq
->
lock
);
// if(tmq->needReportOffsetRows){
req
.
topics
=
taosArrayInit
(
taosArrayGetSize
(
tmq
->
clientTopics
),
sizeof
(
TopicOffsetRows
));
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
){
...
...
@@ -820,6 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
}
// tmq->needReportOffsetRows = false;
// }
taosRUnLockLatch
(
&
tmq
->
lock
);
int32_t
tlen
=
tSerializeSMqHbReq
(
NULL
,
0
,
&
req
);
if
(
tlen
<
0
)
{
...
...
@@ -986,10 +994,12 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
if
(
*
topics
==
NULL
)
{
*
topics
=
tmq_list_new
();
}
taosRLockLatch
(
&
tmq
->
lock
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
SMqClientTopic
*
topic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
tmq_list_append
(
*
topics
,
strchr
(
topic
->
topicName
,
'.'
)
+
1
);
}
taosRUnLockLatch
(
&
tmq
->
lock
);
return
0
;
}
...
...
@@ -1527,12 +1537,7 @@ static void freeClientVgInfo(void* param) {
static
bool
doUpdateLocalEp
(
tmq_t
*
tmq
,
int32_t
epoch
,
const
SMqAskEpRsp
*
pRsp
)
{
bool
set
=
false
;
int32_t
topicNumCur
=
taosArrayGetSize
(
tmq
->
clientTopics
);
int32_t
topicNumGet
=
taosArrayGetSize
(
pRsp
->
topics
);
char
vgKey
[
TSDB_TOPIC_FNAME_LEN
+
22
];
tscInfo
(
"consumer:0x%"
PRIx64
" update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
,
topicNumCur
);
if
(
epoch
<=
tmq
->
epoch
)
{
return
false
;
}
...
...
@@ -1548,6 +1553,12 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
return
false
;
}
taosWLockLatch
(
&
tmq
->
lock
);
int32_t
topicNumCur
=
taosArrayGetSize
(
tmq
->
clientTopics
);
char
vgKey
[
TSDB_TOPIC_FNAME_LEN
+
22
];
tscInfo
(
"consumer:0x%"
PRIx64
" update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
,
topicNumCur
);
// todo extract method
for
(
int32_t
i
=
0
;
i
<
topicNumCur
;
i
++
)
{
// find old topic
...
...
@@ -1579,7 +1590,6 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
taosHashCleanup
(
pVgOffsetHashMap
);
taosWLockLatch
(
&
tmq
->
lock
);
// destroy current buffered existed topics info
if
(
tmq
->
clientTopics
)
{
taosArrayDestroyEx
(
tmq
->
clientTopics
,
freeClientVgInfo
);
...
...
@@ -1807,6 +1817,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if
(
atomic_load_8
(
&
tmq
->
status
)
==
TMQ_CONSUMER_STATUS__RECOVER
){
return
0
;
}
int32_t
code
=
0
;
taosWLockLatch
(
&
tmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" start to poll data, numOfTopics:%d"
,
tmq
->
consumerId
,
numOfTopics
);
...
...
@@ -1816,7 +1829,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for
(
int
j
=
0
;
j
<
numOfVg
;
j
++
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
if
(
taosGetTimestampMs
()
-
pVg
->
emptyBlockReceiveTs
<
EMPTY_BLOCK_POLL_IDLE_DURATION
)
{
// less than 10
0
ms
if
(
taosGetTimestampMs
()
-
pVg
->
emptyBlockReceiveTs
<
EMPTY_BLOCK_POLL_IDLE_DURATION
)
{
// less than 10ms
tscTrace
(
"consumer:0x%"
PRIx64
" epoch %d, vgId:%d idle for 10ms before start next poll"
,
tmq
->
consumerId
,
tmq
->
epoch
,
pVg
->
vgId
);
continue
;
...
...
@@ -1831,15 +1844,17 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
atomic_store_32
(
&
pVg
->
vgSkipCnt
,
0
);
int32_t
code
=
doTmqPollImpl
(
tmq
,
pTopic
,
pVg
,
timeout
);
code
=
doTmqPollImpl
(
tmq
,
pTopic
,
pVg
,
timeout
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
goto
end
;
}
}
}
tscDebug
(
"consumer:0x%"
PRIx64
" end to poll data"
,
tmq
->
consumerId
);
return
0
;
end:
taosWUnLockLatch
(
&
tmq
->
lock
);
tscDebug
(
"consumer:0x%"
PRIx64
" end to poll data, code:%d"
,
tmq
->
consumerId
,
code
);
return
code
;
}
static
int32_t
tmqHandleNoPollRsp
(
tmq_t
*
tmq
,
SMqRspWrapper
*
rspWrapper
,
bool
*
pReset
)
{
...
...
@@ -1891,12 +1906,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqDataRsp
*
pDataRsp
=
&
pollRspWrapper
->
dataRsp
;
if
(
pDataRsp
->
head
.
epoch
==
consumerEpoch
)
{
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientVg
*
pVg
=
getVgInfo
(
tmq
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
pollRspWrapper
->
vgHandle
=
pVg
;
pollRspWrapper
->
topicHandle
=
getTopicInfo
(
tmq
,
pollRspWrapper
->
topicName
);
if
(
pollRspWrapper
->
vgHandle
==
NULL
||
pollRspWrapper
->
topicHandle
==
NULL
){
tscError
(
"consumer:0x%"
PRIx64
" get vg or topic error, topic:%s vgId:%d"
,
tmq
->
consumerId
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
NULL
;
}
// update the epset
...
...
@@ -1944,8 +1961,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pDataRsp
->
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
tmq
->
totalRows
,
pollRspWrapper
->
reqId
);
taosFreeQitem
(
pollRspWrapper
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
}
taosWUnLockLatch
(
&
tmq
->
lock
);
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pDataRsp
->
head
.
epoch
,
consumerEpoch
);
...
...
@@ -1960,12 +1979,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug
(
"consumer:0x%"
PRIx64
" process meta rsp"
,
tmq
->
consumerId
);
if
(
pollRspWrapper
->
metaRsp
.
head
.
epoch
==
consumerEpoch
)
{
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientVg
*
pVg
=
getVgInfo
(
tmq
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
pollRspWrapper
->
vgHandle
=
pVg
;
pollRspWrapper
->
topicHandle
=
getTopicInfo
(
tmq
,
pollRspWrapper
->
topicName
);
if
(
pollRspWrapper
->
vgHandle
==
NULL
||
pollRspWrapper
->
topicHandle
==
NULL
){
tscError
(
"consumer:0x%"
PRIx64
" get vg or topic error, topic:%s vgId:%d"
,
tmq
->
consumerId
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
NULL
;
}
...
...
@@ -1977,6 +1998,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// build rsp
SMqMetaRspObj
*
pRsp
=
tmqBuildMetaRspFromWrapper
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
...
...
@@ -1989,12 +2011,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t
consumerEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
if
(
pollRspWrapper
->
taosxRsp
.
head
.
epoch
==
consumerEpoch
)
{
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientVg
*
pVg
=
getVgInfo
(
tmq
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
pollRspWrapper
->
vgHandle
=
pVg
;
pollRspWrapper
->
topicHandle
=
getTopicInfo
(
tmq
,
pollRspWrapper
->
topicName
);
if
(
pollRspWrapper
->
vgHandle
==
NULL
||
pollRspWrapper
->
topicHandle
==
NULL
){
tscError
(
"consumer:0x%"
PRIx64
" get vg or topic error, topic:%s vgId:%d"
,
tmq
->
consumerId
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
NULL
;
}
...
...
@@ -2017,32 +2041,31 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg
->
emptyBlockReceiveTs
=
taosGetTimestampMs
();
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
continue
;
}
else
{
pVg
->
emptyBlockReceiveTs
=
0
;
// reset the ts
}
// build rsp
void
*
pRsp
=
NULL
;
int64_t
numOfRows
=
0
;
if
(
pollRspWrapper
->
taosxRsp
.
createTableNum
==
0
)
{
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
}
else
{
pRsp
=
tmqBuildTaosxRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
}
// build rsp
void
*
pRsp
=
NULL
;
int64_t
numOfRows
=
0
;
if
(
pollRspWrapper
->
taosxRsp
.
createTableNum
==
0
)
{
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
}
else
{
pRsp
=
tmqBuildTaosxRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
}
tmq
->
totalRows
+=
numOfRows
;
tmq
->
totalRows
+=
numOfRows
;
char
buf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVg
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"
PRId64
", vg total:%"
PRId64
", total:%"
PRId64
", reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pollRspWrapper
->
dataRsp
.
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
tmq
->
totalRows
,
pollRspWrapper
->
reqId
);
taosFreeQitem
(
pollRspWrapper
);
return
pRsp
;
char
buf
[
TSDB_OFFSET_LEN
];
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVg
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"
PRId64
", vg total:%"
PRId64
", total:%"
PRId64
", reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pollRspWrapper
->
dataRsp
.
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
tmq
->
totalRows
,
pollRspWrapper
->
reqId
);
taosFreeQitem
(
pollRspWrapper
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
}
taosWUnLockLatch
(
&
tmq
->
lock
);
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pollRspWrapper
->
taosxRsp
.
head
.
epoch
,
consumerEpoch
);
...
...
@@ -2121,7 +2144,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
}
}
static
void
displayConsumeStatistics
(
const
tmq_t
*
pTmq
)
{
static
void
displayConsumeStatistics
(
tmq_t
*
pTmq
)
{
taosRLockLatch
(
&
pTmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
pTmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" closing poll:%"
PRId64
" rows:%"
PRId64
" topics:%d, final epoch:%d"
,
pTmq
->
consumerId
,
pTmq
->
pollCnt
,
pTmq
->
totalRows
,
numOfTopics
,
pTmq
->
epoch
);
...
...
@@ -2137,7 +2161,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) {
tscDebug
(
"topic:%s, %d. vgId:%d rows:%"
PRId64
,
pTopics
->
topicName
,
j
,
pVg
->
vgId
,
pVg
->
numOfRows
);
}
}
taosRUnLockLatch
(
&
pTmq
->
lock
);
tscDebug
(
"consumer:0x%"
PRIx64
" rows dist end"
,
pTmq
->
consumerId
);
}
...
...
@@ -2544,14 +2568,18 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int32_t
*
numOfAssignment
)
{
*
numOfAssignment
=
0
;
*
assignment
=
NULL
;
SMqVgCommon
*
pCommon
=
NULL
;
int32_t
accId
=
tmq
->
pTscObj
->
acctId
;
char
tname
[
128
]
=
{
0
};
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
tname
);
if
(
pTopic
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
code
=
TSDB_CODE_INVALID_PARA
;
goto
end
;
}
// in case of snapshot is opened, no valid offset will return
...
...
@@ -2561,7 +2589,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
if
(
*
assignment
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" failed to malloc buffer, size:%"
PRIzu
,
tmq
->
consumerId
,
(
*
numOfAssignment
)
*
sizeof
(
tmq_topic_assignment
));
return
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
bool
needFetch
=
false
;
...
...
@@ -2586,10 +2615,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
if
(
needFetch
)
{
SMqVgCommon
*
pCommon
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqVgCommon
));
pCommon
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqVgCommon
));
if
(
pCommon
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
terrno
;
code
=
terrno
;
goto
end
;
}
pCommon
->
pList
=
taosArrayInit
(
4
,
sizeof
(
tmq_topic_assignment
));
...
...
@@ -2604,8 +2634,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SMqVgWalInfoParam
*
pParam
=
taosMemoryMalloc
(
sizeof
(
SMqVgWalInfoParam
));
if
(
pParam
==
NULL
)
{
destroyCommonInfo
(
pCommon
)
;
return
terrno
;
code
=
terrno
;
goto
end
;
}
pParam
->
epoch
=
tmq
->
epoch
;
...
...
@@ -2619,30 +2649,30 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int32_t
msgSize
=
tSerializeSMqPollReq
(
NULL
,
0
,
&
req
);
if
(
msgSize
<
0
)
{
taosMemoryFree
(
pParam
);
destroyCommonInfo
(
pCommon
)
;
return
terrno
;
code
=
terrno
;
goto
end
;
}
char
*
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
taosMemoryFree
(
pParam
);
destroyCommonInfo
(
pCommon
)
;
return
terrno
;
code
=
terrno
;
goto
end
;
}
if
(
tSerializeSMqPollReq
(
msg
,
msgSize
,
&
req
)
<
0
)
{
taosMemoryFree
(
msg
);
taosMemoryFree
(
pParam
);
destroyCommonInfo
(
pCommon
)
;
return
terrno
;
code
=
terrno
;
goto
end
;
}
SMsgSendInfo
*
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
sendInfo
==
NULL
)
{
taosMemoryFree
(
pParam
);
taosMemoryFree
(
msg
);
destroyCommonInfo
(
pCommon
)
;
return
terrno
;
code
=
terrno
;
goto
end
;
}
sendInfo
->
msgInfo
=
(
SDataBuf
){.
pData
=
msg
,
.
len
=
msgSize
,
.
handle
=
NULL
};
...
...
@@ -2662,20 +2692,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
tsem_wait
(
&
pCommon
->
rsp
);
int32_t
code
=
pCommon
->
code
;
code
=
pCommon
->
code
;
terrno
=
code
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFree
(
*
assignment
);
*
assignment
=
NULL
;
*
numOfAssignment
=
0
;
}
else
{
int32_t
num
=
taosArrayGetSize
(
pCommon
->
pList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
(
*
assignment
)[
i
]
=
*
(
tmq_topic_assignment
*
)
taosArrayGet
(
pCommon
->
pList
,
i
);
}
*
numOfAssignment
=
num
;
goto
end
;
}
int32_t
num
=
taosArrayGetSize
(
pCommon
->
pList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
(
*
assignment
)[
i
]
=
*
(
tmq_topic_assignment
*
)
taosArrayGet
(
pCommon
->
pList
,
i
);
}
*
numOfAssignment
=
num
;
for
(
int32_t
j
=
0
;
j
<
(
*
numOfAssignment
);
++
j
)
{
tmq_topic_assignment
*
p
=
&
(
*
assignment
)[
j
];
...
...
@@ -2701,12 +2728,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pOffsetInfo
->
committedOffset
.
version
=
p
->
currentOffset
;
}
}
}
destroyCommonInfo
(
pCommon
);
return
code
;
}
else
{
return
TSDB_CODE_SUCCESS
;
end:
if
(
code
!=
TSDB_CODE_SUCCESS
){
taosMemoryFree
(
*
assignment
);
*
assignment
=
NULL
;
*
numOfAssignment
=
0
;
}
destroyCommonInfo
(
pCommon
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
code
;
}
void
tmq_free_assignment
(
tmq_topic_assignment
*
pAssignment
)
{
...
...
@@ -2727,9 +2759,11 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
char
tname
[
128
]
=
{
0
};
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
tname
);
if
(
pTopic
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid topic name:%s"
,
tmq
->
consumerId
,
pTopicName
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_INVALID_PARA
;
}
...
...
@@ -2745,6 +2779,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if
(
pVg
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid vgroup id:%d"
,
tmq
->
consumerId
,
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_INVALID_PARA
;
}
...
...
@@ -2753,12 +2788,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
int32_t
type
=
pOffsetInfo
->
currentOffset
.
type
;
if
(
type
!=
TMQ_OFFSET__LOG
&&
!
OFFSET_IS_RESET_OFFSET
(
type
))
{
tscError
(
"consumer:0x%"
PRIx64
" offset type:%d not wal version, seek not allowed"
,
tmq
->
consumerId
,
type
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_INVALID_PARA
;
}
if
(
type
==
TMQ_OFFSET__LOG
&&
(
offset
<
pOffsetInfo
->
walVerBegin
||
offset
>
pOffsetInfo
->
walVerEnd
))
{
tscError
(
"consumer:0x%"
PRIx64
" invalid seek params, offset:%"
PRId64
", valid range:[%"
PRId64
", %"
PRId64
"]"
,
tmq
->
consumerId
,
offset
,
pOffsetInfo
->
walVerBegin
,
pOffsetInfo
->
walVerEnd
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_INVALID_PARA
;
}
...
...
@@ -2773,6 +2810,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
tstrncpy
(
rspObj
.
topic
,
tname
,
tListLen
(
rspObj
.
topic
));
tscInfo
(
"consumer:0x%"
PRIx64
" seek to %"
PRId64
" on vgId:%d"
,
tmq
->
consumerId
,
offset
,
pVg
->
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
SSyncCommitInfo
*
pInfo
=
taosMemoryMalloc
(
sizeof
(
SSyncCommitInfo
));
if
(
pInfo
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录