Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1afc9f9f
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1afc9f9f
编写于
8月 16, 2022
作者:
X
xiaolei li
提交者:
GitHub
8月 16, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into docs/xiaolei/TD-18414-update-node-subscribe-doc
上级
093ff2bd
877945aa
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
100 addition
and
121 deletion
+100
-121
docs/examples/python/tmq_example.py
docs/examples/python/tmq_example.py
+5
-58
docs/zh/07-develop/07-tmq.mdx
docs/zh/07-develop/07-tmq.mdx
+80
-57
docs/zh/14-reference/03-connector/_linux_install.mdx
docs/zh/14-reference/03-connector/_linux_install.mdx
+2
-2
docs/zh/14-reference/03-connector/_windows_install.mdx
docs/zh/14-reference/03-connector/_windows_install.mdx
+1
-2
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+12
-2
未找到文件。
docs/examples/python/tmq_example.py
浏览文件 @
1afc9f9f
import
taos
from
taos.tmq
import
*
conn
=
taos
.
connect
()
# create database
conn
.
execute
(
"drop database if exists py_tmq"
)
conn
.
execute
(
"create database if not exists py_tmq vgroups 2"
)
# create table and stables
conn
.
select_db
(
"py_tmq"
)
conn
.
execute
(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
)
conn
.
execute
(
"create table if not exists tb1 using stb1 tags(1)"
)
conn
.
execute
(
"create table if not exists tb2 using stb1 tags(2)"
)
conn
.
execute
(
"create table if not exists tb3 using stb1 tags(3)"
)
# create topic
conn
.
execute
(
"drop topic if exists topic_ctb_column"
)
conn
.
execute
(
"create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1"
)
# set consumer configure options
conf
=
TaosTmqConf
()
conf
.
set
(
"group.id"
,
"tg2"
)
conf
.
set
(
"td.connect.user"
,
"root"
)
conf
.
set
(
"td.connect.pass"
,
"taosdata"
)
conf
.
set
(
"enable.auto.commit"
,
"true"
)
conf
.
set
(
"msg.with.table.name"
,
"true"
)
def
tmq_commit_cb_print
(
tmq
,
resp
,
offset
,
param
=
None
):
print
(
f
"commit:
{
resp
}
, tmq:
{
tmq
}
, offset:
{
offset
}
, param:
{
param
}
"
)
conf
.
set_auto_commit_cb
(
tmq_commit_cb_print
,
None
)
# build consumer
tmq
=
conf
.
new_consumer
()
# build topic list
topic_list
=
TaosTmqList
()
topic_list
.
append
(
"topic_ctb_column"
)
# subscribe consumer
tmq
.
subscribe
(
topic_list
)
# check subscriptions
sub_list
=
tmq
.
subscription
()
print
(
"subscribed topics: "
,
sub_list
)
# start subscribe
while
1
:
res
=
tmq
.
poll
(
1000
)
if
res
:
topic
=
res
.
get_topic_name
()
vg
=
res
.
get_vgroup_id
()
db
=
res
.
get_db_name
()
print
(
f
"topic:
{
topic
}
\n
vgroup id:
{
vg
}
\n
db:
{
db
}
"
)
for
row
in
res
:
print
(
row
)
tb
=
res
.
get_table_name
()
print
(
f
"from table:
{
tb
}
"
)
from
taos.tmq
import
TaosConsumer
consumer
=
TaosConsumer
(
'topic_ctb_column'
,
group_id
=
'vg2'
)
for
msg
in
consumer
:
for
row
in
msg
:
print
(
row
)
docs/zh/07-develop/07-tmq.mdx
浏览文件 @
1afc9f9f
...
...
@@ -87,6 +87,27 @@ void commitSync() throws SQLException;
void close() throws SQLException;
```
</TabItem>
<TabItem value="Python" label="Python">
```python
class TaosConsumer():
def __init__(self, *topics, **configs)
def __iter__(self)
def __next__(self)
def sync_next(self)
def subscription(self)
def unsubscribe(self)
def close(self)
def __del__(self)
```
</TabItem>
<TabItem label="Go" value="Go">
...
...
@@ -275,6 +296,29 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
```
</TabItem>
<TabItem value="Python" label="Python">
Python 使用以下配置项创建一个 Consumer 实例。
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
| `client_id` | string | 客户端 ID | 最大长度:192。 |
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
| `timeout` | int | 消费者拉去的超时时间 | |
</TabItem>
<TabItem label="Go" value="Go">
```go
...
...
@@ -397,6 +441,7 @@ if err != nil {
</TabItem>
<TabItem value="C#" label="C#">
```C#
...
...
@@ -409,6 +454,15 @@ consumer.Subscribe(topics);
</TabItem>
<TabItem value="Python" label="Python">
```python
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
```
</TabItem>
</Tabs>
## 消费
...
...
@@ -441,6 +495,16 @@ while(running){
```
</TabItem>
<TabItem value="Python" label="Python">
```python
for msg in consumer:
for row in msg:
print(row)
```
</TabItem>
<TabItem value="Go" label="Go">
```go
...
...
@@ -502,6 +566,15 @@ consumer.close();
```
</TabItem>
<TabItem value="Python" label="Python">
```python
/* 取消订阅 */
consumer.unsubscribe();
/* 关闭消费 */
consumer.close();
<TabItem value="Go" label="Go">
```go
...
...
@@ -867,64 +940,14 @@ int main(int argc, char* argv[]) {
```python
import taos
from taos.tmq import
*
from taos.tmq import
TaosConsumer
conn = taos.connect()
# create database
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
# create table and stables
conn.select_db("py_tmq")
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
# create topic
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
# set consumer configure options
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
conf.set("msg.with.table.name", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
# build consumer
tmq = conf.new_consumer()
# build topic list
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
# subscribe consumer
tmq.subscribe(topic_list)
# check subscriptions
sub_list = tmq.subscription()
print("subscribed topics: ",sub_list)
# start subscribe
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row)
tb = res.get_table_name()
print(f"from table: {tb}")
import taos
from taos.tmq import *
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
for msg in consumer:
for row in msg:
print(row)
```
...
...
docs/zh/14-reference/03-connector/_linux_install.mdx
浏览文件 @
1afc9f9f
...
...
@@ -2,9 +2,9 @@ import PkgListV3 from "/components/PkgListV3";
1. 下载客户端安装包
<PkgList type={1} sys="Linux" />
<PkgList
V3
type={1} sys="Linux" />
[所有下载](
https://www.taosdata.com/cn/all-downloads/
)
[所有下载](
../../releases
)
2. 解压缩软件包
...
...
docs/zh/14-reference/03-connector/_windows_install.mdx
浏览文件 @
1afc9f9f
...
...
@@ -4,8 +4,7 @@ import PkgListV3 from "/components/PkgListV3";
<PkgListV3 type={4} sys="Windows" />
[所有下载](https://www.taosdata.com/cn/all-downloads/)
[所有下载](../../releases)
2. 执行安装程序,按提示选择默认值,完成安装
3. 安装路径
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
1afc9f9f
...
...
@@ -4918,6 +4918,16 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
return
numOfElems
;
}
static
SSampleInfo
*
getSampleOutputInfo
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SSampleInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pInfo
->
data
=
(
char
*
)
pInfo
+
sizeof
(
SSampleInfo
);
pInfo
->
tuplePos
=
(
STuplePos
*
)((
char
*
)
pInfo
+
sizeof
(
SSampleInfo
)
+
pInfo
->
samples
*
pInfo
->
colBytes
);
return
pInfo
;
}
bool
getSampleFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
SValueNode
*
pVal
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
...
...
@@ -4972,7 +4982,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
int32_t
sampleFunction
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SSampleInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SSampleInfo
*
pInfo
=
getSampleOutputInfo
(
pCtx
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
...
...
@@ -4998,7 +5008,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
int32_t
sampleFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
pCtx
);
SSampleInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pEntryInfo
);
SSampleInfo
*
pInfo
=
getSampleOutputInfo
(
pCtx
);
pEntryInfo
->
complete
=
true
;
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录