Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
517fb883
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
517fb883
编写于
12月 24, 2019
作者:
weixin_48148422
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
subscription (WIP)
上级
0ad471e3
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
236 addition
and
134 deletion
+236
-134
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+2
-0
src/client/src/tscServer.c
src/client/src/tscServer.c
+3
-2
src/client/src/tscSql.c
src/client/src/tscSql.c
+5
-0
src/client/src/tscSub.c
src/client/src/tscSub.c
+191
-84
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+1
-1
src/inc/taos.h
src/inc/taos.h
+4
-5
src/inc/taosmsg.h
src/inc/taosmsg.h
+2
-0
src/system/detail/src/vnodeShell.c
src/system/detail/src/vnodeShell.c
+1
-0
tests/examples/c/subscribe.c
tests/examples/c/subscribe.c
+27
-42
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
517fb883
...
@@ -334,6 +334,7 @@ typedef struct {
...
@@ -334,6 +334,7 @@ typedef struct {
int
rspType
;
int
rspType
;
int
rspLen
;
int
rspLen
;
uint64_t
qhandle
;
uint64_t
qhandle
;
int64_t
uid
;
int64_t
useconds
;
int64_t
useconds
;
int64_t
offset
;
// offset value from vnode during projection query of stable
int64_t
offset
;
// offset value from vnode during projection query of stable
int
row
;
int
row
;
...
@@ -380,6 +381,7 @@ typedef struct _sql_obj {
...
@@ -380,6 +381,7 @@ typedef struct _sql_obj {
uint32_t
queryId
;
uint32_t
queryId
;
void
*
thandle
;
void
*
thandle
;
void
*
pStream
;
void
*
pStream
;
void
*
pSubscription
;
char
*
sqlstr
;
char
*
sqlstr
;
char
retry
;
char
retry
;
char
maxRetry
;
char
maxRetry
;
...
...
src/client/src/tscServer.c
浏览文件 @
517fb883
...
@@ -1531,7 +1531,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn
...
@@ -1531,7 +1531,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn
SMeterSidExtInfo
*
pMeterInfo
=
(
SMeterSidExtInfo
*
)
pMsg
;
SMeterSidExtInfo
*
pMeterInfo
=
(
SMeterSidExtInfo
*
)
pMsg
;
pMeterInfo
->
sid
=
htonl
(
pMeterMeta
->
sid
);
pMeterInfo
->
sid
=
htonl
(
pMeterMeta
->
sid
);
pMeterInfo
->
uid
=
htobe64
(
pMeterMeta
->
uid
);
pMeterInfo
->
uid
=
htobe64
(
pMeterMeta
->
uid
);
pMeterInfo
->
skey
=
tscGetSubscriptionProgress
(
pSql
,
pMeterMeta
->
uid
);
pMsg
+=
sizeof
(
SMeterSidExtInfo
);
pMsg
+=
sizeof
(
SMeterSidExtInfo
);
}
else
{
}
else
{
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
pMeterMetaInfo
->
vnodeIndex
);
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
pMeterMetaInfo
->
vnodeIndex
);
...
@@ -1542,6 +1542,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn
...
@@ -1542,6 +1542,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn
pMeterInfo
->
sid
=
htonl
(
pQueryMeterInfo
->
sid
);
pMeterInfo
->
sid
=
htonl
(
pQueryMeterInfo
->
sid
);
pMeterInfo
->
uid
=
htobe64
(
pQueryMeterInfo
->
uid
);
pMeterInfo
->
uid
=
htobe64
(
pQueryMeterInfo
->
uid
);
pMeterInfo
->
skey
=
tscGetSubscriptionProgress
(
pSql
,
pMeterMeta
->
uid
);
pMsg
+=
sizeof
(
SMeterSidExtInfo
);
pMsg
+=
sizeof
(
SMeterSidExtInfo
);
...
@@ -3535,7 +3536,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
...
@@ -3535,7 +3536,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes
->
numOfRows
=
htonl
(
pRetrieve
->
numOfRows
);
pRes
->
numOfRows
=
htonl
(
pRetrieve
->
numOfRows
);
pRes
->
precision
=
htons
(
pRetrieve
->
precision
);
pRes
->
precision
=
htons
(
pRetrieve
->
precision
);
pRes
->
offset
=
htobe64
(
pRetrieve
->
offset
);
pRes
->
offset
=
htobe64
(
pRetrieve
->
offset
);
pRes
->
uid
=
pRetrieve
->
uid
;
pRes
->
useconds
=
htobe64
(
pRetrieve
->
useconds
);
pRes
->
useconds
=
htobe64
(
pRetrieve
->
useconds
);
pRes
->
data
=
pRetrieve
->
data
;
pRes
->
data
=
pRetrieve
->
data
;
...
...
src/client/src/tscSql.c
浏览文件 @
517fb883
...
@@ -627,6 +627,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
...
@@ -627,6 +627,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
rows
=
taos_fetch_row_impl
(
res
);
rows
=
taos_fetch_row_impl
(
res
);
}
}
if
(
rows
!=
NULL
&&
pSql
->
pSubscription
!=
NULL
)
{
TSKEY
ts
=
*
(
TSKEY
*
)
rows
[
pCmd
->
fieldsInfo
.
numOfOutputCols
-
1
];
tscUpdateSubscriptionProgress
(
pMeterMetaInfo
->
pMeterMeta
->
uid
,
ts
);
}
// check!!!
// check!!!
if
(
rows
!=
NULL
||
pMeterMetaInfo
->
vnodeIndex
>=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
if
(
rows
!=
NULL
||
pMeterMetaInfo
->
vnodeIndex
>=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
break
;
break
;
...
...
src/client/src/tscSub.c
浏览文件 @
517fb883
...
@@ -22,125 +22,232 @@
...
@@ -22,125 +22,232 @@
#include "tsclient.h"
#include "tsclient.h"
#include "tsocket.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "tutil.h"
#include "tscUtil.h"
typedef
struct
{
typedef
struct
SSubscriptionProgress
{
int64_t
uid
;
TSKEY
key
;
}
SSubscriptionProgress
;
typedef
struct
SSub
{
void
*
signature
;
void
*
signature
;
char
name
[
TSDB_METER_ID_LEN
];
int
mseconds
;
TSKEY
lastKey
;
uint64_t
stime
;
TAOS_FIELD
fields
[
TSDB_MAX_COLUMNS
];
int
numOfFields
;
TAOS
*
taos
;
TAOS
*
taos
;
TAOS_RES
*
result
;
void
*
pTimer
;
SSqlObj
*
pSql
;
int
interval
;
TAOS_SUBSCRIBE_CALLBACK
fp
;
void
*
param
;
int
numOfMeters
;
SSubscriptionProgress
*
progress
;
}
SSub
;
}
SSub
;
TAOS_SUB
*
taos_subscribe
(
const
char
*
host
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
const
char
*
name
,
int64_t
time
,
int
mseconds
)
{
SSub
*
pSub
;
pSub
=
(
SSub
*
)
malloc
(
sizeof
(
SSub
));
static
int
tscCompareSubscriptionProgress
(
const
void
*
a
,
const
void
*
b
)
{
if
(
pSub
==
NULL
)
return
NULL
;
return
((
const
SSubscriptionProgress
*
)
a
)
->
uid
-
((
const
SSubscriptionProgress
*
)
b
)
->
uid
;
memset
(
pSub
,
0
,
sizeof
(
SSub
));
}
pSub
->
signature
=
pSub
;
TSKEY
tscGetSubscriptionProgress
(
SSqlObj
*
pSql
,
int64_t
uid
)
{
strcpy
(
pSub
->
name
,
name
);
if
(
pSql
==
NULL
||
pSql
->
pSubscription
==
NULL
)
pSub
->
mseconds
=
mseconds
;
return
0
;
pSub
->
lastKey
=
time
;
if
(
pSub
->
lastKey
==
0
)
{
SSub
*
pSub
=
(
SSub
*
)
pSql
->
pSubscription
;
pSub
->
lastKey
=
taosGetTimestampMs
();
for
(
int
s
=
0
,
e
=
pSub
->
numOfMeters
;
s
<
e
;)
{
int
m
=
(
s
+
e
)
/
2
;
SSubscriptionProgress
*
p
=
pSub
->
progress
+
m
;
if
(
p
->
uid
>
uid
)
e
=
m
;
else
if
(
p
->
uid
<
uid
)
s
=
m
+
1
;
else
return
p
->
key
;
}
}
taos_init
();
return
0
;
pSub
->
taos
=
taos_connect
(
host
,
user
,
pass
,
NULL
,
0
);
}
if
(
pSub
->
taos
==
NULL
)
{
tfree
(
pSub
);
void
tscUpdateSubscriptionProgress
(
SSqlObj
*
pSql
,
int64_t
uid
,
TSKEY
ts
)
{
}
else
{
if
(
pSql
==
NULL
||
pSql
->
pSubscription
==
NULL
)
char
qstr
[
256
]
=
{
0
};
return
;
sprintf
(
qstr
,
"use %s"
,
db
);
int
res
=
taos_query
(
pSub
->
taos
,
qstr
);
SSub
*
pSub
=
(
SSub
*
)
pSql
->
pSubscription
;
if
(
res
!=
0
)
{
for
(
int
s
=
0
,
e
=
pSub
->
numOfMeters
;
s
<
e
;)
{
tscError
(
"failed to open DB:%s"
,
db
);
int
m
=
(
s
+
e
)
/
2
;
taos_close
(
pSub
->
taos
);
SSubscriptionProgress
*
p
=
pSub
->
progress
+
m
;
tfree
(
pSub
);
if
(
p
->
uid
>
uid
)
}
else
{
e
=
m
;
snprintf
(
qstr
,
tListLen
(
qstr
),
"select * from %s where _c0 > now+1000d"
,
pSub
->
name
);
else
if
(
p
->
uid
<
uid
)
if
(
taos_query
(
pSub
->
taos
,
qstr
))
{
s
=
m
+
1
;
tscTrace
(
"failed to select, reason:%s"
,
taos_errstr
(
pSub
->
taos
));
else
{
taos_close
(
pSub
->
taos
);
p
->
key
=
ts
+
1
;
tfree
(
pSub
);
break
;
}
}
}
static
SSub
*
tscCreateSubscription
(
STscObj
*
pObj
,
const
char
*
sql
)
{
SSub
*
pSub
=
calloc
(
1
,
sizeof
(
SSub
));
if
(
pSub
==
NULL
)
{
globalCode
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
tscError
(
"failed to allocate memory for subscription"
);
return
NULL
;
return
NULL
;
}
}
pSub
->
result
=
taos_use_result
(
pSub
->
taos
);
pSub
->
numOfFields
=
taos_num_fields
(
pSub
->
result
);
SSqlObj
*
pSql
=
calloc
(
1
,
sizeof
(
SSqlObj
));
memcpy
(
pSub
->
fields
,
taos_fetch_fields
(
pSub
->
result
),
sizeof
(
TAOS_FIELD
)
*
pSub
->
numOfFields
);
if
(
pSql
==
NULL
)
{
globalCode
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
tscError
(
"failed to allocate SSqlObj for subscription"
);
goto
failed
;
}
}
pSql
->
signature
=
pSql
;
pSql
->
pTscObj
=
pObj
;
char
*
sqlstr
=
(
char
*
)
malloc
(
strlen
(
sql
)
+
1
);
if
(
sqlstr
==
NULL
)
{
tscError
(
"failed to allocate sql string for subscription"
);
goto
failed
;
}
}
strcpy
(
sqlstr
,
sql
);
strtolower
(
sqlstr
,
sqlstr
);
pSql
->
sqlstr
=
sqlstr
;
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
tsem_init
(
&
pSql
->
emptyRspSem
,
0
,
1
);
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
numOfRows
=
1
;
pRes
->
numOfTotal
=
0
;
pSql
->
pSubscription
=
pSub
;
pSub
->
pSql
=
pSql
;
pSub
->
signature
=
pSub
;
return
pSub
;
return
pSub
;
failed:
if
(
sqlstr
!=
NULL
)
{
free
(
sqlstr
);
}
if
(
pSql
!=
NULL
)
{
free
(
pSql
);
}
free
(
pSub
);
return
NULL
;
}
}
TAOS_ROW
taos_consume
(
TAOS_SUB
*
tsub
)
{
SSub
*
pSub
=
(
SSub
*
)
tsub
;
TAOS_ROW
row
;
char
qstr
[
256
];
if
(
pSub
==
NULL
)
return
NULL
;
static
void
tscProcessSubscribeTimer
(
void
*
handle
,
void
*
tmrId
)
{
if
(
pSub
->
signature
!=
pSub
)
return
NULL
;
SSub
*
pSub
=
(
SSub
*
)
handle
;
if
(
pSub
==
NULL
||
pSub
->
pTimer
!=
tmrId
)
return
;
while
(
1
)
{
TAOS_RES
*
res
=
taos_consume
(
pSub
);
if
(
pSub
->
result
!=
NULL
)
{
if
(
res
!=
NULL
)
{
row
=
taos_fetch_row
(
pSub
->
result
);
pSub
->
fp
(
pSub
->
param
,
res
,
0
);
if
(
row
!=
NULL
)
{
taos_free_result
(
res
);
pSub
->
lastKey
=
*
((
uint64_t
*
)
row
[
0
]);
return
row
;
}
}
taos_free_result
(
pSub
->
result
);
taosTmrReset
(
tscProcessSubscribeTimer
,
pSub
->
interval
,
pSub
,
tscTmr
,
&
pSub
->
pTimer
);
pSub
->
result
=
NULL
;
}
uint64_t
etime
=
taosGetTimestampMs
();
int64_t
mseconds
=
pSub
->
mseconds
-
etime
+
pSub
->
stime
;
if
(
mseconds
<
0
)
mseconds
=
0
;
taosMsleep
((
int
)
mseconds
);
}
pSub
->
stime
=
taosGetTimestampMs
();
sprintf
(
qstr
,
"select * from %s where _c0 > %"
PRId64
" order by _c0 asc"
,
pSub
->
name
,
pSub
->
lastKey
);
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
)
{
if
(
taos_query
(
pSub
->
taos
,
qstr
))
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
tscTrace
(
"failed to select, reason:%s"
,
taos_errstr
(
pSub
->
taos
));
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
globalCode
=
TSDB_CODE_DISCONNECTED
;
tscError
(
"connection disconnected"
);
return
NULL
;
return
NULL
;
}
}
pSub
->
result
=
taos_use_result
(
pSub
->
taos
);
SSub
*
pSub
=
tscCreateSubscription
(
pObj
,
sql
);
if
(
pSub
==
NULL
)
{
return
NULL
;
}
if
(
pSub
->
result
==
NULL
)
{
int
code
=
(
uint8_t
)
tsParseSql
(
pSub
->
pSql
,
pObj
->
acctId
,
pObj
->
db
,
false
);
tscTrace
(
"failed to get result, reason:%s"
,
taos_errstr
(
pSub
->
taos
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taos_unsubscribe
(
pSub
);
return
NULL
;
return
NULL
;
}
}
// ??? if there's more than one vnode
SSqlCmd
*
pCmd
=
&
pSub
->
pSql
->
cmd
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
0
);
if
(
UTIL_METER_IS_NOMRAL_METER
(
pMeterMetaInfo
))
{
pSub
->
numOfMeters
=
1
;
pSub
->
progress
=
calloc
(
1
,
sizeof
(
SSubscriptionProgress
));
pSub
->
progress
[
0
].
uid
=
pMeterMetaInfo
->
pMeterMeta
->
uid
;
}
else
{
SMetricMeta
*
pMetricMeta
=
pMeterMetaInfo
->
pMetricMeta
;
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
pMeterMetaInfo
->
vnodeIndex
);
pSub
->
numOfMeters
=
pVnodeSidList
->
numOfSids
;
pSub
->
progress
=
calloc
(
pSub
->
numOfMeters
,
sizeof
(
SSubscriptionProgress
));
for
(
int32_t
i
=
0
;
i
<
pSub
->
numOfMeters
;
++
i
)
{
SMeterSidExtInfo
*
pMeterInfo
=
tscGetMeterSidInfo
(
pVnodeSidList
,
i
);
pSub
->
progress
[
i
].
uid
=
pMeterInfo
->
uid
;
}
qsort
(
pSub
->
progress
,
pSub
->
numOfMeters
,
sizeof
(
SSubscriptionProgress
),
tscCompareSubscriptionProgress
);
}
// timestamp must in the output column
SFieldInfo
*
pFieldInfo
=
&
pCmd
->
fieldsInfo
;
tscFieldInfoSetValue
(
pFieldInfo
,
pFieldInfo
->
numOfOutputCols
,
TSDB_DATA_TYPE_TIMESTAMP
,
"_c0"
,
TSDB_KEYSIZE
);
tscSqlExprInsertEmpty
(
pCmd
,
pFieldInfo
->
numOfOutputCols
-
1
,
TSDB_FUNC_PRJ
);
tscFieldInfoUpdateVisible
(
pFieldInfo
,
pFieldInfo
->
numOfOutputCols
-
1
,
false
);
tscFieldInfoCalOffset
(
pCmd
);
if
(
fp
!=
NULL
)
{
pSub
->
fp
=
fp
;
pSub
->
interval
=
interval
;
pSub
->
param
=
param
;
taosTmrReset
(
tscProcessSubscribeTimer
,
0
,
pSub
,
tscTmr
,
&
pSub
->
pTimer
);
}
}
return
pSub
;
}
TAOS_RES
*
taos_consume
(
TAOS_SUB
*
tsub
)
{
SSub
*
pSub
=
(
SSub
*
)
tsub
;
if
(
pSub
==
NULL
)
return
NULL
;
SSqlObj
*
pSql
=
pSub
->
pSql
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
numOfRows
=
1
;
pRes
->
numOfTotal
=
0
;
pRes
->
qhandle
=
0
;
pSql
->
thandle
=
NULL
;
tscDoQuery
(
pSql
);
if
(
pRes
->
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
return
NULL
;
}
return
pSql
;
}
}
void
taos_unsubscribe
(
TAOS_SUB
*
tsub
)
{
void
taos_unsubscribe
(
TAOS_SUB
*
tsub
)
{
SSub
*
pSub
=
(
SSub
*
)
tsub
;
SSub
*
pSub
=
(
SSub
*
)
tsub
;
if
(
pSub
==
NULL
||
pSub
->
signature
!=
pSub
)
return
;
if
(
pSub
==
NULL
)
return
;
if
(
pSub
->
pTimer
!=
NULL
)
{
if
(
pSub
->
signature
!=
pSub
)
return
;
taosTmrStop
(
pSub
->
pTimer
);
}
taos_close
(
pSub
->
taos
);
tscFreeSqlObj
(
pSub
->
pSql
);
free
(
pSub
->
progress
);
memset
(
pSub
,
0
,
sizeof
(
*
pSub
));
free
(
pSub
);
free
(
pSub
);
}
}
int
taos_subfields_count
(
TAOS_SUB
*
tsub
)
{
int
taos_subfields_count
(
TAOS_SUB
*
tsub
)
{
SSub
*
pSub
=
(
SSub
*
)
tsub
;
SSub
*
pSub
=
(
SSub
*
)
tsub
;
return
pSub
->
numOfFields
;
return
taos_num_fields
(
pSub
->
pSql
)
;
}
}
TAOS_FIELD
*
taos_fetch_subfields
(
TAOS_SUB
*
tsub
)
{
TAOS_FIELD
*
taos_fetch_subfields
(
TAOS_SUB
*
tsub
)
{
SSub
*
pSub
=
(
SSub
*
)
tsub
;
SSub
*
pSub
=
(
SSub
*
)
tsub
;
return
pSub
->
f
ields
;
return
pSub
->
pSql
->
cmd
.
fieldsInfo
.
pF
ields
;
}
}
src/client/src/tscUtil.c
浏览文件 @
517fb883
...
@@ -819,7 +819,7 @@ void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIE
...
@@ -819,7 +819,7 @@ void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIE
}
}
void
tscFieldInfoUpdateVisible
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
bool
visible
)
{
void
tscFieldInfoUpdateVisible
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
bool
visible
)
{
if
(
index
<
0
||
index
>
pFieldInfo
->
numOfOutputCols
)
{
if
(
index
<
0
||
index
>
=
pFieldInfo
->
numOfOutputCols
)
{
return
;
return
;
}
}
...
...
src/inc/taos.h
浏览文件 @
517fb883
...
@@ -116,11 +116,10 @@ DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param
...
@@ -116,11 +116,10 @@ DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param
DLL_EXPORT
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
int
numOfRows
),
void
*
param
);
DLL_EXPORT
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
int
numOfRows
),
void
*
param
);
DLL_EXPORT
void
taos_fetch_row_a
(
TAOS_RES
*
res
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
void
*
param
);
DLL_EXPORT
void
taos_fetch_row_a
(
TAOS_RES
*
res
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
void
*
param
);
DLL_EXPORT
TAOS_SUB
*
taos_subscribe
(
const
char
*
host
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
const
char
*
table
,
int64_t
time
,
int
mseconds
);
typedef
void
(
*
TAOS_SUBSCRIBE_CALLBACK
)(
void
*
param
,
TAOS_RES
*
res
,
int
code
);
DLL_EXPORT
TAOS_ROW
taos_consume
(
TAOS_SUB
*
tsub
);
DLL_EXPORT
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
);
DLL_EXPORT
TAOS_RES
*
taos_consume
(
TAOS_SUB
*
tsub
);
DLL_EXPORT
void
taos_unsubscribe
(
TAOS_SUB
*
tsub
);
DLL_EXPORT
void
taos_unsubscribe
(
TAOS_SUB
*
tsub
);
DLL_EXPORT
int
taos_subfields_count
(
TAOS_SUB
*
tsub
);
DLL_EXPORT
TAOS_FIELD
*
taos_fetch_subfields
(
TAOS_SUB
*
tsub
);
DLL_EXPORT
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sql
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
DLL_EXPORT
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sql
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
));
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
));
...
...
src/inc/taosmsg.h
浏览文件 @
517fb883
...
@@ -490,6 +490,7 @@ typedef struct SColumnInfo {
...
@@ -490,6 +490,7 @@ typedef struct SColumnInfo {
typedef
struct
SMeterSidExtInfo
{
typedef
struct
SMeterSidExtInfo
{
int32_t
sid
;
int32_t
sid
;
int64_t
uid
;
int64_t
uid
;
TSKEY
skey
;
// start key for subscription
char
tags
[];
char
tags
[];
}
SMeterSidExtInfo
;
}
SMeterSidExtInfo
;
...
@@ -572,6 +573,7 @@ typedef struct {
...
@@ -572,6 +573,7 @@ typedef struct {
int16_t
precision
;
int16_t
precision
;
int64_t
offset
;
// updated offset value for multi-vnode projection query
int64_t
offset
;
// updated offset value for multi-vnode projection query
int64_t
useconds
;
int64_t
useconds
;
int64_t
uid
;
char
data
[];
char
data
[];
}
SRetrieveMeterRsp
;
}
SRetrieveMeterRsp
;
...
...
src/system/detail/src/vnodeShell.c
浏览文件 @
517fb883
...
@@ -456,6 +456,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
...
@@ -456,6 +456,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
if
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pRsp
->
offset
=
htobe64
(
vnodeGetOffsetVal
((
void
*
)
pRetrieve
->
qhandle
));
pRsp
->
offset
=
htobe64
(
vnodeGetOffsetVal
((
void
*
)
pRetrieve
->
qhandle
));
pRsp
->
useconds
=
htobe64
(((
SQInfo
*
)(
pRetrieve
->
qhandle
))
->
useconds
);
pRsp
->
useconds
=
htobe64
(((
SQInfo
*
)(
pRetrieve
->
qhandle
))
->
useconds
);
pRsp
->
uid
=
((
SQInfo
*
)(
pRetrieve
->
qhandle
))
->
pObj
->
uid
;
}
else
{
}
else
{
pRsp
->
offset
=
0
;
pRsp
->
offset
=
0
;
pRsp
->
useconds
=
0
;
pRsp
->
useconds
=
0
;
...
...
tests/examples/c/subscribe.c
浏览文件 @
517fb883
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// sample code for TDengine subscribe/consume API
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos
// to compile: gcc -o subscribe subscribe.c -ltaos
...
@@ -21,40 +6,40 @@
...
@@ -21,40 +6,40 @@
#include <string.h>
#include <string.h>
#include <taos.h> // include TDengine header file
#include <taos.h> // include TDengine header file
int
main
(
int
argc
,
char
*
argv
[])
int
main
(
int
argc
,
char
*
argv
[])
{
{
// init TAOS
TAOS_SUB
*
tsub
;
taos_init
();
TAOS_ROW
row
;
char
dbname
[
64
],
table
[
64
];
char
temp
[
256
];
if
(
argc
==
1
)
{
TAOS
*
taos
=
taos_connect
(
argv
[
1
],
"root"
,
"taosdata"
,
"test"
,
0
);
printf
(
"usage: %s server-ip db-name table-name
\n
"
,
argv
[
0
]);
if
(
taos
==
NULL
)
{
exit
(
0
);
printf
(
"failed to connect to db, reason:%s
\n
"
,
taos_errstr
(
taos
));
exit
(
1
);
}
}
if
(
argc
>=
2
)
strcpy
(
dbname
,
argv
[
2
]);
TAOS_SUB
*
tsub
=
taos_subscribe
(
taos
,
"select * from meters;"
,
NULL
,
NULL
,
0
);
if
(
argc
>=
3
)
strcpy
(
table
,
argv
[
3
]);
tsub
=
taos_subscribe
(
argv
[
1
],
"root"
,
"taosdata"
,
dbname
,
table
,
0
,
1000
);
if
(
tsub
==
NULL
)
{
if
(
tsub
==
NULL
)
{
printf
(
"failed to c
onnet to db:%s
\n
"
,
dbname
);
printf
(
"failed to c
reate subscription.
\n
"
);
exit
(
1
);
exit
(
0
);
}
}
TAOS_FIELD
*
fields
=
taos_fetch_subfields
(
tsub
);
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
int
fcount
=
taos_subfields_count
(
tsub
);
TAOS_RES
*
res
=
taos_consume
(
tsub
);
TAOS_ROW
row
;
printf
(
"start to retrieve data
\n
"
);
int
rows
=
0
;
printf
(
"please use other taos client, insert rows into %s.%s
\n
"
,
dbname
,
table
);
int
num_fields
=
taos_subfields_count
(
tsub
);
while
(
1
)
{
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
res
);
row
=
taos_consume
(
tsub
);
char
temp
[
256
];
if
(
row
==
NULL
)
break
;
taos_print_row
(
temp
,
row
,
fields
,
fcount
);
// fetch the records row by row
while
((
row
=
taos_fetch_row
(
res
)))
{
rows
++
;
taos_print_row
(
temp
,
row
,
fields
,
num_fields
);
printf
(
"%s
\n
"
,
temp
);
printf
(
"%s
\n
"
,
temp
);
}
}
printf
(
"
\n
"
);
}
taos_unsubscribe
(
tsub
);
taos_unsubscribe
(
tsub
);
return
0
;
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录