tmq_example.py 1.7 KB
Newer Older
Z
zhaoyanggh 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
import taos
from taos.tmq import *

conn = taos.connect()

# create database
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")

# create table and stables
conn.select_db("py_tmq")
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")

# create topic
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")

# set consumer configure options
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
conf.set("msg.with.table.name", "true")

def tmq_commit_cb_print(tmq, resp, offset, param=None):
    print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
    
conf.set_auto_commit_cb(tmq_commit_cb_print, None)

# build consumer
tmq = conf.new_consumer()

# build topic list
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")

# subscribe consumer
tmq.subscribe(topic_list)

# check subscriptions
sub_list = tmq.subscription()
print("subscribed topics: ",sub_list)

# start subscribe
while 1:
    res = tmq.poll(1000)
    if res:
        topic = res.get_topic_name()
        vg = res.get_vgroup_id()
        db = res.get_db_name()
        print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
        for row in res:
            print(row)
            tb = res.get_table_name()
            print(f"from table: {tb}")