Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
75eacd56
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
75eacd56
编写于
7月 27, 2023
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix:add log
上级
2d41a429
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
44 addition
and
27 deletion
+44
-27
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+3
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+38
-21
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+3
-3
未找到文件。
source/client/src/clientTmq.c
浏览文件 @
75eacd56
...
...
@@ -1948,7 +1948,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
taosWUnLockLatch
(
&
tmq
->
lock
);
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tsc
Info
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pDataRsp
->
head
.
epoch
,
consumerEpoch
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tsc
Info
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pollRspWrapper
->
metaRsp
.
head
.
epoch
,
consumerEpoch
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -2034,7 +2034,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
taosWUnLockLatch
(
&
tmq
->
lock
);
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tsc
Info
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pollRspWrapper
->
taosxRsp
.
head
.
epoch
,
consumerEpoch
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
75eacd56
...
...
@@ -148,6 +148,20 @@ void tqClose(STQ* pTq) {
return
;
}
void
*
pIter
=
taosHashIterate
(
pTq
->
pPushMgr
,
NULL
);
while
(
pIter
)
{
STqHandle
*
pHandle
=
*
(
STqHandle
**
)
pIter
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
if
(
pHandle
->
msg
!=
NULL
)
{
tqPushEmptyDataRsp
(
pHandle
,
vgId
);
rpcFreeCont
(
pHandle
->
msg
->
pCont
);
taosMemoryFree
(
pHandle
->
msg
);
pHandle
->
msg
=
NULL
;
}
pIter
=
taosHashIterate
(
pTq
->
pPushMgr
,
pIter
);
}
tqOffsetClose
(
pTq
->
pOffsetStore
);
taosHashCleanup
(
pTq
->
pHandle
);
taosHashCleanup
(
pTq
->
pPushMgr
);
...
...
@@ -244,6 +258,10 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
tqInitDataRsp
(
&
dataRsp
,
&
req
);
dataRsp
.
blockNum
=
0
;
dataRsp
.
rspOffset
=
dataRsp
.
reqOffset
;
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
dataRsp
.
reqOffset
);
tqInfo
(
"tqPushEmptyDataRsp to consumer:0x%"
PRIx64
" vgId:%d, offset:%s, reqId:0x%"
PRIx64
,
req
.
consumerId
,
vgId
,
buf
,
req
.
reqId
);
tqSendDataRsp
(
pHandle
,
pHandle
->
msg
,
&
req
,
&
dataRsp
,
TMQ_MSG_TYPE__POLL_RSP
,
vgId
);
tDeleteMqDataRsp
(
&
dataRsp
);
return
0
;
...
...
@@ -481,10 +499,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
while
(
pIter
)
{
STqHandle
*
pHandle
=
*
(
STqHandle
**
)
pIter
;
tq
Debug
(
"vgId:%d start set submit for pHandle:%p, consumer:0x%"
PRIx64
,
vgId
,
pHandle
,
pHandle
->
consumerId
);
tq
Info
(
"vgId:%d start set submit for pHandle:%p, consumer:0x%"
PRIx64
,
vgId
,
pHandle
,
pHandle
->
consumerId
);
if
(
ASSERT
(
pHandle
->
msg
!=
NULL
))
{
tqError
(
"pHandle->msg should not be null"
);
taosHashCancelIterate
(
pTq
->
pPushMgr
,
pIter
);
break
;
}
else
{
SRpcMsg
msg
=
{.
msgType
=
TDMT_VND_TMQ_CONSUME
,
.
pCont
=
pHandle
->
msg
->
pCont
,
.
contLen
=
pHandle
->
msg
->
contLen
,
.
info
=
pHandle
->
msg
->
info
};
...
...
@@ -815,30 +834,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosWLockLatch
(
&
pTq
->
lock
);
if
(
pHandle
->
consumerId
==
req
.
newConsumerId
)
{
// do nothing
tqInfo
(
"vgId:%d consumer:0x%"
PRIx64
" remains, no switch occurs, should not reach here"
,
req
.
vgId
,
req
.
newConsumerId
);
tqInfo
(
"vgId:%d no switch consumer:0x%"
PRIx64
" remains, because redo wal log"
,
req
.
vgId
,
req
.
newConsumerId
);
}
else
{
tqInfo
(
"vgId:%d switch consumer from Id:0x%"
PRIx64
" to Id:0x%"
PRIx64
,
req
.
vgId
,
pHandle
->
consumerId
,
req
.
newConsumerId
);
tqInfo
(
"vgId:%d switch consumer from Id:0x%"
PRIx64
" to Id:0x%"
PRIx64
,
req
.
vgId
,
pHandle
->
consumerId
,
req
.
newConsumerId
);
atomic_store_64
(
&
pHandle
->
consumerId
,
req
.
newConsumerId
);
// atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle
(
pTq
,
pHandle
);
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
}
// atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle
(
pTq
,
pHandle
);
taosWUnLockLatch
(
&
pTq
->
lock
);
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
}
end:
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
75eacd56
...
...
@@ -75,12 +75,12 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy
(
pHandle
->
msg
->
pCont
,
pMsg
->
pCont
,
pMsg
->
contLen
);
pHandle
->
msg
->
contLen
=
pMsg
->
contLen
;
int32_t
ret
=
taosHashPut
(
pTq
->
pPushMgr
,
pHandle
->
subKey
,
strlen
(
pHandle
->
subKey
),
&
pHandle
,
POINTER_BYTES
);
tq
Debug
(
"vgId:%d data is over, ret:%d, consumerId:0x%"
PRIx64
", register to pHandle:%p, pCont:%p, len:%d"
,
vgId
,
ret
,
tq
Info
(
"vgId:%d data is over, ret:%d, consumerId:0x%"
PRIx64
", register to pHandle:%p, pCont:%p, len:%d"
,
vgId
,
ret
,
pHandle
->
consumerId
,
pHandle
,
pHandle
->
msg
->
pCont
,
pHandle
->
msg
->
contLen
);
return
0
;
}
int
32_t
tqUnregisterPushHandle
(
STQ
*
pTq
,
void
*
handle
)
{
int
tqUnregisterPushHandle
(
STQ
*
pTq
,
void
*
handle
)
{
STqHandle
*
pHandle
=
(
STqHandle
*
)
handle
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
...
...
@@ -88,7 +88,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
return
0
;
}
int32_t
ret
=
taosHashRemove
(
pTq
->
pPushMgr
,
pHandle
->
subKey
,
strlen
(
pHandle
->
subKey
));
tq
Debug
(
"vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%"
PRIx64
,
vgId
,
pHandle
,
ret
,
pHandle
->
consumerId
);
tq
Info
(
"vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%"
PRIx64
,
vgId
,
pHandle
,
ret
,
pHandle
->
consumerId
);
if
(
pHandle
->
msg
!=
NULL
)
{
// tqPushDataRsp(pHandle, vgId);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录