Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4ba0819f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4ba0819f
编写于
1月 02, 2020
作者:
weixin_48148422
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TBASE-1424, TBASE-1425: save & load subscription progress.
上级
44536fdc
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
126 addition
and
30 deletion
+126
-30
src/client/src/tscServer.c
src/client/src/tscServer.c
+7
-5
src/client/src/tscSub.c
src/client/src/tscSub.c
+109
-15
src/inc/taos.h
src/inc/taos.h
+1
-1
src/system/detail/src/vnodeQueryProcess.c
src/system/detail/src/vnodeQueryProcess.c
+0
-4
tests/examples/c/subscribe.c
tests/examples/c/subscribe.c
+9
-5
未找到文件。
src/client/src/tscServer.c
浏览文件 @
4ba0819f
...
...
@@ -46,8 +46,9 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void
(
*
tscUpdateVnodeMsg
[
TSDB_SQL_MAX
])(
SSqlObj
*
pSql
,
char
*
buf
);
void
tscProcessActivityTimer
(
void
*
handle
,
void
*
tmrId
);
int
tscKeepConn
[
TSDB_SQL_MAX
]
=
{
0
};
TSKEY
tscGetSubscriptionProgress
(
SSqlObj
*
pSql
,
int64_t
uid
);
void
tscUpdateSubscriptionProgress
(
SSqlObj
*
pSql
,
int64_t
uid
,
TSKEY
ts
);
TSKEY
tscGetSubscriptionProgress
(
void
*
sub
,
int64_t
uid
);
void
tscUpdateSubscriptionProgress
(
void
*
sub
,
int64_t
uid
,
TSKEY
ts
);
void
tscSaveSubscriptionProgress
(
void
*
sub
);
static
int32_t
minMsgSize
()
{
return
tsRpcHeadSize
+
sizeof
(
STaosDigest
);
}
...
...
@@ -1533,7 +1534,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn
SMeterSidExtInfo
*
pMeterInfo
=
(
SMeterSidExtInfo
*
)
pMsg
;
pMeterInfo
->
sid
=
htonl
(
pMeterMeta
->
sid
);
pMeterInfo
->
uid
=
htobe64
(
pMeterMeta
->
uid
);
pMeterInfo
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
,
pMeterMeta
->
uid
));
pMeterInfo
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
->
pSubscription
,
pMeterMeta
->
uid
));
pMsg
+=
sizeof
(
SMeterSidExtInfo
);
}
else
{
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
pMeterMetaInfo
->
vnodeIndex
);
...
...
@@ -1544,7 +1545,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn
pMeterInfo
->
sid
=
htonl
(
pQueryMeterInfo
->
sid
);
pMeterInfo
->
uid
=
htobe64
(
pQueryMeterInfo
->
uid
);
pMeterInfo
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
,
pQueryMeterInfo
->
uid
));
pMeterInfo
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
->
pSubscription
,
pQueryMeterInfo
->
uid
));
pMsg
+=
sizeof
(
SMeterSidExtInfo
);
...
...
@@ -3555,8 +3556,9 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
p
+=
sizeof
(
int64_t
);
TSKEY
key
=
htobe64
(
*
(
TSKEY
*
)
p
);
p
+=
sizeof
(
TSKEY
);
tscUpdateSubscriptionProgress
(
pSql
,
uid
,
key
);
tscUpdateSubscriptionProgress
(
pSql
->
pSubscription
,
uid
,
key
);
}
tscSaveSubscriptionProgress
(
pSql
->
pSubscription
);
}
pRes
->
row
=
0
;
...
...
src/client/src/tscSub.c
浏览文件 @
4ba0819f
...
...
@@ -32,6 +32,7 @@ typedef struct SSubscriptionProgress {
}
SSubscriptionProgress
;
typedef
struct
SSub
{
char
topic
[
32
];
int64_t
lastSyncTime
;
void
*
signature
;
TAOS
*
taos
;
...
...
@@ -49,11 +50,11 @@ static int tscCompareSubscriptionProgress(const void* a, const void* b) {
return
((
const
SSubscriptionProgress
*
)
a
)
->
uid
-
((
const
SSubscriptionProgress
*
)
b
)
->
uid
;
}
TSKEY
tscGetSubscriptionProgress
(
SSqlObj
*
pSql
,
int64_t
uid
)
{
if
(
pSql
==
NULL
||
pSql
->
pSubscription
==
NULL
)
TSKEY
tscGetSubscriptionProgress
(
void
*
sub
,
int64_t
uid
)
{
if
(
sub
==
NULL
)
return
0
;
SSub
*
pSub
=
(
SSub
*
)
pSql
->
pSubscription
;
SSub
*
pSub
=
(
SSub
*
)
sub
;
for
(
int
s
=
0
,
e
=
pSub
->
numOfMeters
;
s
<
e
;)
{
int
m
=
(
s
+
e
)
/
2
;
SSubscriptionProgress
*
p
=
pSub
->
progress
+
m
;
...
...
@@ -68,11 +69,11 @@ TSKEY tscGetSubscriptionProgress(SSqlObj* pSql, int64_t uid) {
return
0
;
}
void
tscUpdateSubscriptionProgress
(
SSqlObj
*
pSql
,
int64_t
uid
,
TSKEY
ts
)
{
if
(
pSql
==
NULL
||
pSql
->
pSubscription
==
NULL
)
void
tscUpdateSubscriptionProgress
(
void
*
sub
,
int64_t
uid
,
TSKEY
ts
)
{
if
(
sub
==
NULL
)
return
;
SSub
*
pSub
=
(
SSub
*
)
pSql
->
pSubscription
;
SSub
*
pSub
=
(
SSub
*
)
sub
;
for
(
int
s
=
0
,
e
=
pSub
->
numOfMeters
;
s
<
e
;)
{
int
m
=
(
s
+
e
)
/
2
;
SSubscriptionProgress
*
p
=
pSub
->
progress
+
m
;
...
...
@@ -88,7 +89,7 @@ void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts) {
}
static
SSub
*
tscCreateSubscription
(
STscObj
*
pObj
,
const
char
*
sql
)
{
static
SSub
*
tscCreateSubscription
(
STscObj
*
pObj
,
const
char
*
topic
,
const
char
*
sql
)
{
SSub
*
pSub
=
calloc
(
1
,
sizeof
(
SSub
));
if
(
pSub
==
NULL
)
{
globalCode
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
...
...
@@ -125,6 +126,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* sql) {
pSql
->
pSubscription
=
pSub
;
pSub
->
pSql
=
pSql
;
pSub
->
signature
=
pSub
;
strncpy
(
pSub
->
topic
,
topic
,
sizeof
(
pSub
->
topic
));
return
pSub
;
failed:
...
...
@@ -139,7 +141,7 @@ failed:
}
static
void
tscProcessSubscri
be
Timer
(
void
*
handle
,
void
*
tmrId
)
{
static
void
tscProcessSubscri
ption
Timer
(
void
*
handle
,
void
*
tmrId
)
{
SSub
*
pSub
=
(
SSub
*
)
handle
;
if
(
pSub
==
NULL
||
pSub
->
pTimer
!=
tmrId
)
return
;
...
...
@@ -149,7 +151,7 @@ static void tscProcessSubscribeTimer(void *handle, void *tmrId) {
// TODO: memory leak
}
taosTmrReset
(
tscProcessSubscri
be
Timer
,
pSub
->
interval
,
pSub
,
tscTmr
,
&
pSub
->
pTimer
);
taosTmrReset
(
tscProcessSubscri
ption
Timer
,
pSub
->
interval
,
pSub
,
tscTmr
,
&
pSub
->
pTimer
);
}
...
...
@@ -172,7 +174,7 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
progress
=
calloc
(
1
,
sizeof
(
SSubscriptionProgress
));
int64_t
uid
=
pMeterMetaInfo
->
pMeterMeta
->
uid
;
progress
[
0
].
uid
=
uid
;
progress
[
0
].
key
=
tscGetSubscriptionProgress
(
pSub
->
pSql
,
uid
);
progress
[
0
].
key
=
tscGetSubscriptionProgress
(
pSub
,
uid
);
}
else
{
SMetricMeta
*
pMetricMeta
=
pMeterMetaInfo
->
pMetricMeta
;
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
pMeterMetaInfo
->
vnodeIndex
);
...
...
@@ -182,7 +184,7 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
SMeterSidExtInfo
*
pMeterInfo
=
tscGetMeterSidInfo
(
pVnodeSidList
,
i
);
int64_t
uid
=
pMeterInfo
->
uid
;
progress
[
i
].
uid
=
uid
;
progress
[
i
].
key
=
tscGetSubscriptionProgress
(
pSub
->
pSql
,
uid
);
progress
[
i
].
key
=
tscGetSubscriptionProgress
(
pSub
,
uid
);
}
qsort
(
progress
,
numOfMeters
,
sizeof
(
SSubscriptionProgress
),
tscCompareSubscriptionProgress
);
}
...
...
@@ -204,7 +206,93 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
)
{
static
void
tscLoadSubscriptionProgress
(
SSub
*
pSub
)
{
char
buf
[
TSDB_MAX_SQL_LEN
];
sprintf
(
buf
,
"%s/subscribe/%s"
,
dataDir
,
pSub
->
topic
);
FILE
*
fp
=
fopen
(
buf
,
"r"
);
if
(
fp
==
NULL
)
{
tscTrace
(
"subscription progress file does not exist: %s"
,
pSub
->
topic
);
return
true
;
}
if
(
fgets
(
buf
,
sizeof
(
buf
),
fp
)
==
NULL
)
{
tscTrace
(
"invalid subscription progress file: %s"
,
pSub
->
topic
);
fclose
(
fp
);
return
false
;
}
for
(
int
i
=
0
;
i
<
sizeof
(
buf
);
i
++
)
{
if
(
buf
[
i
]
==
0
)
break
;
if
(
buf
[
i
]
==
'\r'
||
buf
[
i
]
==
'\n'
)
{
buf
[
i
]
=
0
;
break
;
}
}
if
(
strcmp
(
buf
,
pSub
->
pSql
->
sqlstr
)
!=
0
)
{
tscTrace
(
"subscription sql statement mismatch: %s"
,
pSub
->
topic
);
fclose
(
fp
);
return
false
;
}
if
(
fgets
(
buf
,
sizeof
(
buf
),
fp
)
==
NULL
||
atoi
(
buf
)
<
0
)
{
tscTrace
(
"invalid subscription progress file: %s"
,
pSub
->
topic
);
fclose
(
fp
);
return
false
;
}
int
numOfMeters
=
atoi
(
buf
);
SSubscriptionProgress
*
progress
=
calloc
(
numOfMeters
,
sizeof
(
SSubscriptionProgress
));
for
(
int
i
=
0
;
i
<
numOfMeters
;
i
++
)
{
if
(
fgets
(
buf
,
sizeof
(
buf
),
fp
)
==
NULL
)
{
fclose
(
fp
);
free
(
progress
);
return
false
;
}
int64_t
uid
,
key
;
sscanf
(
buf
,
"uid=%"
SCNd64
",progress=%"
SCNd64
,
&
uid
,
&
key
);
progress
[
i
].
uid
=
uid
;
progress
[
i
].
key
=
key
;
}
fclose
(
fp
);
qsort
(
progress
,
numOfMeters
,
sizeof
(
SSubscriptionProgress
),
tscCompareSubscriptionProgress
);
pSub
->
numOfMeters
=
numOfMeters
;
pSub
->
progress
=
progress
;
return
true
;
}
void
tscSaveSubscriptionProgress
(
void
*
sub
)
{
SSub
*
pSub
=
(
SSub
*
)
sub
;
char
path
[
256
];
sprintf
(
path
,
"%s/subscribe"
,
dataDir
);
if
(
access
(
path
,
0
)
!=
0
)
{
mkdir
(
path
,
0777
);
}
sprintf
(
path
,
"%s/subscribe/%s"
,
dataDir
,
pSub
->
topic
);
FILE
*
fp
=
fopen
(
path
,
"w+"
);
if
(
fp
==
NULL
)
{
tscError
(
"failed to create progress file for subscription: %s"
,
pSub
->
topic
);
return
;
}
fputs
(
pSub
->
pSql
->
sqlstr
,
fp
);
fprintf
(
fp
,
"
\n
%d
\n
"
,
pSub
->
numOfMeters
);
for
(
int
i
=
0
;
i
<
pSub
->
numOfMeters
;
i
++
)
{
int64_t
uid
=
pSub
->
progress
[
i
].
uid
;
TSKEY
key
=
pSub
->
progress
[
i
].
key
;
fprintf
(
fp
,
"uid=%"
PRId64
",progress=%"
PRId64
"
\n
"
,
uid
,
key
);
}
fclose
(
fp
);
}
TAOS_SUB
*
taos_subscribe
(
const
char
*
topic
,
int
restart
,
TAOS
*
taos
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
globalCode
=
TSDB_CODE_DISCONNECTED
;
...
...
@@ -212,12 +300,18 @@ TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp
return
NULL
;
}
SSub
*
pSub
=
tscCreateSubscription
(
pObj
,
sql
);
SSub
*
pSub
=
tscCreateSubscription
(
pObj
,
topic
,
sql
);
if
(
pSub
==
NULL
)
{
return
NULL
;
}
pSub
->
taos
=
taos
;
if
(
restart
)
{
tscTrace
(
"restart subscription: %s"
,
topic
);
}
else
{
tscLoadSubscriptionProgress
(
pSub
);
}
if
(
!
tscUpdateSubscription
(
pObj
,
pSub
))
{
return
NULL
;
}
...
...
@@ -226,7 +320,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp
pSub
->
fp
=
fp
;
pSub
->
interval
=
interval
;
pSub
->
param
=
param
;
taosTmrReset
(
tscProcessSubscri
be
Timer
,
0
,
pSub
,
tscTmr
,
&
pSub
->
pTimer
);
taosTmrReset
(
tscProcessSubscri
ption
Timer
,
0
,
pSub
,
tscTmr
,
&
pSub
->
pTimer
);
}
return
pSub
;
...
...
@@ -236,7 +330,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSub
*
pSub
=
(
SSub
*
)
tsub
;
if
(
pSub
==
NULL
)
return
NULL
;
if
(
taosGetTimestampMs
()
-
pSub
->
lastSyncTime
>
30
*
1
0
*
1000
)
{
if
(
taosGetTimestampMs
()
-
pSub
->
lastSyncTime
>
30
*
6
0
*
1000
)
{
taos_query
(
pSub
->
taos
,
"reset query cache;"
);
// TODO: clear memory
if
(
!
tscUpdateSubscription
(
pSub
->
taos
,
pSub
))
return
NULL
;
...
...
src/inc/taos.h
浏览文件 @
4ba0819f
...
...
@@ -117,7 +117,7 @@ DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RE
DLL_EXPORT
void
taos_fetch_row_a
(
TAOS_RES
*
res
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
void
*
param
);
typedef
void
(
*
TAOS_SUBSCRIBE_CALLBACK
)(
TAOS_SUB
*
tsub
,
TAOS_RES
*
res
,
void
*
param
,
int
code
);
DLL_EXPORT
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
);
DLL_EXPORT
TAOS_SUB
*
taos_subscribe
(
const
char
*
topic
,
int
restart
,
TAOS
*
taos
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
);
DLL_EXPORT
TAOS_RES
*
taos_consume
(
TAOS_SUB
*
tsub
);
DLL_EXPORT
void
taos_unsubscribe
(
TAOS_SUB
*
tsub
);
...
...
src/system/detail/src/vnodeQueryProcess.c
浏览文件 @
4ba0819f
...
...
@@ -682,18 +682,14 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
while
(
pSupporter
->
meterIdx
<
pSupporter
->
numOfMeters
)
{
int32_t
k
=
pSupporter
->
meterIdx
;
pQInfo
->
killed
=
0
;
/*
if
(
isQueryKilled
(
pQuery
))
{
setQueryStatus
(
pQuery
,
QUERY_NO_DATA_TO_CHECK
);
return
;
}
*/
TSKEY
skey
=
pQInfo
->
pMeterQuerySupporter
->
pMeterSidExtInfo
[
k
]
->
key
;
if
(
skey
>
0
)
{
pQuery
->
skey
=
skey
;
// pQuery->lastKey = ???;
}
bool
dataInDisk
=
true
;
...
...
tests/examples/c/subscribe.c
浏览文件 @
4ba0819f
...
...
@@ -28,7 +28,7 @@ int main(int argc, char *argv[]) {
const
char
*
host
=
"127.0.0.1"
;
const
char
*
user
=
"root"
;
const
char
*
passwd
=
"taosdata"
;
int
async
=
1
;
int
async
=
1
,
restart
=
0
;
TAOS_SUB
*
tsub
=
NULL
;
for
(
int
i
=
1
;
i
<
argc
;
i
++
)
{
...
...
@@ -44,8 +44,12 @@ int main(int argc, char *argv[]) {
passwd
=
argv
[
i
]
+
3
;
continue
;
}
if
(
strncmp
(
argv
[
i
],
"-m="
,
3
)
==
0
)
{
async
=
strcmp
(
argv
[
i
]
+
3
,
"sync"
);
if
(
strcmp
(
argv
[
i
],
"-sync"
)
==
0
)
{
async
=
0
;
continue
;
}
if
(
strcmp
(
argv
[
i
],
"-restart"
)
==
0
)
{
restart
=
1
;
continue
;
}
}
...
...
@@ -60,9 +64,9 @@ int main(int argc, char *argv[]) {
}
if
(
async
)
{
tsub
=
taos_subscribe
(
taos
,
"select * from meters;"
,
subscribe_callback
,
NULL
,
1000
);
tsub
=
taos_subscribe
(
"test"
,
restart
,
taos
,
"select * from meters;"
,
subscribe_callback
,
NULL
,
1000
);
}
else
{
tsub
=
taos_subscribe
(
taos
,
"select * from meters;"
,
NULL
,
NULL
,
0
);
tsub
=
taos_subscribe
(
"test"
,
restart
,
taos
,
"select * from meters;"
,
NULL
,
NULL
,
0
);
}
if
(
tsub
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录