提交 9a9afc06 编写于 作者: Z zhaoyanggh

docs: refine python tmq doc

上级 6a9efd2a
import taos
from taos.tmq import *
conn = taos.connect()
# create database
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
# create table and stables
conn.select_db("py_tmq")
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
# create topic
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
# set consumer configure options
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
conf.set("msg.with.table.name", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
# build consumer
tmq = conf.new_consumer()
# build topic list
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
# subscribe consumer
tmq.subscribe(topic_list)
# check subscriptions
sub_list = tmq.subscription()
print("subscribed topics: ",sub_list)
# start subscribe
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row)
tb = res.get_table_name()
print(f"from table: {tb}")
from taos.tmq import TaosConsumer
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
for msg in consumer:
for row in msg:
print(row)
......@@ -88,6 +88,110 @@ void close() throws SQLException;
```
</TabItem>
</TabItem>
<TabItem value="python" label="python">
```python
class TaosConsumer():
DEFAULT_CONFIG = {
'group.id',
'client.id',
'enable.auto.commit',
'auto.commit.interval.ms',
'auto.offset.reset',
'msg.with.table.name',
'experimental.snapshot.enable',
'enable.heartbeat.background',
'experimental.snapshot.batch.size',
'td.connect.ip',
'td.connect.user',
'td.connect.pass',
'td.connect.port',
'td.connect.db',
'timeout'
}
def __init__(self, *topics, **configs):
self._closed = True
self._conf = None
self._list = None
self._tmq = None
keys = list(configs.keys())
for k in keys:
configs.update({k.replace('_','.'): configs.pop(k)})
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise TmqError("Unrecognized configs: %s" % (extra_configs,))
self._conf = tmq_conf_new()
self._list = tmq_list_new()
# set poll timeout
if 'timeout' in configs:
self._timeout = configs['timeout']
del configs['timeout']
else:
self._timeout = 0
# check if group id is set
if 'group.id' not in configs:
raise TmqError("missing group.id in consumer config setting")
for key, value in configs.items():
tmq_conf_set(self._conf, key, value)
self._tmq = tmq_consumer_new(self._conf)
if not topics:
raise TmqError("Unset topic for Consumer")
for topic in topics:
tmq_list_append(self._list, topic)
tmq_subscribe(self._tmq, self._list)
def __iter__(self):
return self
def __next__(self):
if not self._tmq:
raise StopIteration('TaosConsumer closed')
return next(self.sync_next())
def sync_next(self):
while 1:
res = tmq_consumer_poll(self._tmq, self._timeout)
if res:
break
yield TaosResult(res)
def subscription(self):
if self._tmq is None:
return None
return tmq_subscription(self._tmq)
def unsubscribe(self):
tmq_unsubscribe(self._tmq)
def close(self):
if self._tmq:
tmq_consumer_close(self._tmq)
self._tmq = None
def __del__(self):
if self._conf:
tmq_conf_destroy(self._conf)
if self._list:
tmq_list_destroy(self._list)
if self._tmq:
tmq_consumer_close(self._tmq)
```
</TabItem>
</Tabs>
## 写入数据
......@@ -230,6 +334,27 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
```
</TabItem>
</TabItem>
<TabItem value="python" label="Python">
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
| `client_id` | string | 客户端 ID | 最大长度:192。 |
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
</TabItem>
</Tabs>
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
......@@ -262,6 +387,14 @@ consumer.subscribe(topics);
</TabItem>
</TabItem>
<TabItem value="python" label="Python">
```python
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
```
</TabItem>
</Tabs>
## 消费
......@@ -294,6 +427,17 @@ while(running){
```
</TabItem>
</TabItem>
<TabItem value="python" label="Python">
```python
for msg in consumer:
for row in msg:
print(row)
```
</TabItem>
</Tabs>
## 结束消费
......@@ -322,6 +466,19 @@ consumer.unsubscribe();
consumer.close();
```
</TabItem>
</TabItem>
<TabItem value="python" label="Python">
```python
/* 取消订阅 */
consumer.unsubscribe();
/* 关闭消费 */
consumer.close();
```
</TabItem>
</Tabs>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册