Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ce1b6147
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ce1b6147
编写于
10月 20, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(client): add debug log
上级
63096c9d
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
23 addition
and
10 deletion
+23
-10
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+16
-4
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+7
-6
未找到文件。
source/client/src/clientTmq.c
浏览文件 @
ce1b6147
...
...
@@ -979,6 +979,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const
SArray
*
container
=
&
topic_list
->
container
;
int32_t
sz
=
taosArrayGetSize
(
container
);
void
*
buf
=
NULL
;
SMsgSendInfo
*
sendInfo
=
NULL
;
SCMSubscribeReq
req
=
{
0
};
int32_t
code
=
-
1
;
...
...
@@ -1016,7 +1017,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
void
*
abuf
=
buf
;
tSerializeSCMSubscribeReq
(
&
abuf
,
&
req
);
SMsgSendInfo
*
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
sendInfo
==
NULL
)
goto
FAIL
;
SMqSubscribeCbParam
param
=
{
...
...
@@ -1046,6 +1047,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
// avoid double free if msg is sent
buf
=
NULL
;
sendInfo
=
NULL
;
tsem_wait
(
&
param
.
rspSem
);
tsem_destroy
(
&
param
.
rspSem
);
...
...
@@ -1078,8 +1080,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code
=
0
;
FAIL:
if
(
req
.
topicNames
!=
NULL
)
taosArrayDestroyP
(
req
.
topicNames
,
taosMemoryFree
);
taosArrayDestroyP
(
req
.
topicNames
,
taosMemoryFree
);
taosMemoryFree
(
buf
);
taosMemoryFree
(
sendInfo
);
return
code
;
}
...
...
@@ -1613,7 +1616,11 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if
(
rspWrapper
==
NULL
)
{
taosReadAllQitems
(
tmq
->
mqueue
,
tmq
->
qall
);
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
if
(
rspWrapper
==
NULL
)
return
NULL
;
if
(
rspWrapper
==
NULL
)
{
tscDebug
(
"consumer %"
PRId64
" mqueue empty"
,
tmq
->
consumerId
);
return
NULL
;
}
}
if
(
rspWrapper
->
tmqRspType
==
TMQ_MSG_TYPE__END_RSP
)
{
...
...
@@ -1711,6 +1718,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void
*
rspObj
;
int64_t
startTime
=
taosGetTimestampMs
();
tscDebug
(
"consumer:%"
PRId64
", start poll at %"
PRId64
,
tmq
->
consumerId
,
startTime
);
#if 0
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, timeout);
...
...
@@ -1745,15 +1754,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
rspObj
=
tmqHandleAllRsp
(
tmq
,
timeout
,
false
);
if
(
rspObj
)
{
tscDebug
(
"consumer:%"
PRId64
", return rsp"
,
tmq
->
consumerId
);
return
(
TAOS_RES
*
)
rspObj
;
}
else
if
(
terrno
==
TSDB_CODE_TQ_NO_COMMITTED_OFFSET
)
{
tscDebug
(
"consumer:%"
PRId64
", return null since no committed offset"
,
tmq
->
consumerId
);
return
NULL
;
}
if
(
timeout
!=
-
1
)
{
int64_t
endTime
=
taosGetTimestampMs
();
int64_t
leftTime
=
endTime
-
startTime
;
if
(
leftTime
>
timeout
)
{
tscDebug
(
"consumer:%"
PRId64
", (epoch %d) timeout, no rsp"
,
tmq
->
consumerId
,
tmq
->
epoch
);
tscDebug
(
"consumer:%"
PRId64
", (epoch %d) timeout, no rsp, start time %"
PRId64
", end time %"
PRId64
,
tmq
->
consumerId
,
tmq
->
epoch
,
startTime
,
endTime
);
return
NULL
;
}
tsem_timewait
(
&
tmq
->
rspSem
,
leftTime
*
1000
);
...
...
source/libs/wal/src/walRead.c
浏览文件 @
ce1b6147
...
...
@@ -278,12 +278,12 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
wDebug
(
"vgId:%d, wal starts to fetch body, index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
if
(
pRead
->
capacity
<
pReadHead
->
bodyLen
)
{
void
*
ptr
=
taosMemoryRealloc
(
pRead
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
SWalCkHead
*
ptr
=
(
SWalCkHead
*
)
taosMemoryRealloc
(
pRead
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
return
-
1
;
}
pRead
->
pHead
=
(
SWalCkHead
*
)
ptr
;
pRead
->
pHead
=
ptr
;
pReadHead
=
&
pRead
->
pHead
->
head
;
pRead
->
capacity
=
pReadHead
->
bodyLen
;
}
...
...
@@ -398,12 +398,12 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
int64_t
ver
=
pReadHead
->
version
;
if
(
pRead
->
capacity
<
pReadHead
->
bodyLen
)
{
void
*
ptr
=
taosMemoryRealloc
(
*
ppHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
SWalCkHead
*
ptr
=
(
SWalCkHead
*
)
taosMemoryRealloc
(
*
ppHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
return
-
1
;
}
*
ppHead
=
(
SWalCkHead
*
)
ptr
;
*
ppHead
=
ptr
;
pReadHead
=
&
((
*
ppHead
)
->
head
);
pRead
->
capacity
=
pReadHead
->
bodyLen
;
}
...
...
@@ -493,13 +493,14 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
}
if
(
pReader
->
capacity
<
pReader
->
pHead
->
head
.
bodyLen
)
{
void
*
ptr
=
taosMemoryRealloc
(
pReader
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReader
->
pHead
->
head
.
bodyLen
);
SWalCkHead
*
ptr
=
(
SWalCkHead
*
)
taosMemoryRealloc
(
pReader
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReader
->
pHead
->
head
.
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
taosThreadMutexUnlock
(
&
pReader
->
mutex
);
return
-
1
;
}
pReader
->
pHead
=
(
SWalCkHead
*
)
ptr
;
pReader
->
pHead
=
ptr
;
pReader
->
capacity
=
pReader
->
pHead
->
head
.
bodyLen
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录