Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
2464d892
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2464d892
编写于
6月 10, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix bug
上级
c7d7133f
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
15 addition
and
9 deletion
+15
-9
src/client/src/tscSub.c
src/client/src/tscSub.c
+15
-9
未找到文件。
src/client/src/tscSub.c
浏览文件 @
2464d892
...
...
@@ -282,6 +282,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
SArray
*
tables
=
getTableList
(
pSql
);
if
(
tables
==
NULL
)
{
pSub
->
lastSyncTime
=
0
;
//force to get table list next time
return
0
;
}
size_t
numOfTables
=
taosArrayGetSize
(
tables
);
...
...
@@ -488,7 +489,15 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSub
*
pSub
=
(
SSub
*
)
tsub
;
if
(
pSub
==
NULL
)
return
NULL
;
if
(
pSub
->
pSql
->
cmd
.
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
)
{
if
(
pSub
->
pTimer
==
NULL
)
{
int64_t
duration
=
taosGetTimestampMs
()
-
pSub
->
lastConsumeTime
;
if
(
duration
<
(
int64_t
)(
pSub
->
interval
))
{
tscDebug
(
"subscription consume too frequently, blocking..."
);
taosMsleep
(
pSub
->
interval
-
(
int32_t
)
duration
);
}
}
if
(
pSub
->
pSql
->
cmd
.
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
)
{
//may reach here when retireve stable vgroup failed
SSqlObj
*
pSql
=
recreateSqlObj
(
pSub
);
if
(
pSql
==
NULL
)
{
return
NULL
;
...
...
@@ -500,6 +509,11 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
}
pSub
->
pSql
=
pSql
;
pSql
->
pSubscription
=
pSub
;
// no table list now, force to update it
tscDebug
(
"begin table synchronization"
);
if
(
!
tscUpdateSubscription
(
pSub
->
taos
,
pSub
))
return
NULL
;
tscDebug
(
"table synchronization completed"
);
}
tscSaveSubscriptionProgress
(
pSub
);
...
...
@@ -524,14 +538,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
tscDebug
(
"subscribe:%s set next round subscribe skey:%"
PRId64
,
pSub
->
topic
,
pQueryInfo
->
window
.
skey
);
}
if
(
pSub
->
pTimer
==
NULL
)
{
int64_t
duration
=
taosGetTimestampMs
()
-
pSub
->
lastConsumeTime
;
if
(
duration
<
(
int64_t
)(
pSub
->
interval
))
{
tscDebug
(
"subscription consume too frequently, blocking..."
);
taosMsleep
(
pSub
->
interval
-
(
int32_t
)
duration
);
}
}
size_t
size
=
taosArrayGetSize
(
pSub
->
progress
)
*
sizeof
(
STableIdInfo
);
size
+=
sizeof
(
SQueryTableMsg
)
+
4096
;
int
code
=
tscAllocPayload
(
&
pSql
->
cmd
,
(
int
)
size
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录