tmq_example.py 1.9 KB
Newer Older
1
from taos.tmq import Consumer
Z
zhaoyanggh 已提交
2
import taos
3 4


5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
def init_tmq_env(db, topic):
    conn = taos.connect()
    conn.execute("drop topic if exists {}".format(topic))
    conn.execute("drop database if exists {}".format(db))
    conn.execute("create database if not exists {}".format(db))
    conn.select_db(db)
    conn.execute(
        "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
    conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
    conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
    conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
    conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
    conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')")
    conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')")
    conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")


S
sunpeng 已提交
22 23 24 25 26 27
def cleanup(db, topic):
    conn = taos.connect()
    conn.execute("drop topic if exists {}".format(topic))
    conn.execute("drop database if exists {}".format(db))


28
if __name__ == '__main__':
S
sunpeng 已提交
29
    init_tmq_env("tmq_test", "tmq_test_topic")  # init env
30 31 32 33 34 35 36 37 38 39 40 41
    consumer = Consumer(
        {
            "group.id": "tg2",
            "td.connect.user": "root",
            "td.connect.pass": "taosdata",
            "enable.auto.commit": "true",
        }
    )
    consumer.subscribe(["tmq_test_topic"])

    try:
        while True:
S
sunpeng 已提交
42
            res = consumer.poll(1)
43
            if not res:
S
sunpeng 已提交
44
                break
45 46 47 48 49 50 51 52 53 54
            err = res.error()
            if err is not None:
                raise err
            val = res.value()

            for block in val:
                print(block.fetchall())
    finally:
        consumer.unsubscribe()
        consumer.close()
S
sunpeng 已提交
55
        cleanup("tmq_test", "tmq_test_topic")