from taos.tmq import Consumer import taos def init_tmq_env(db, topic): conn = taos.connect() conn.execute("drop topic if exists {}".format(topic)) conn.execute("drop database if exists {}".format(db)) conn.execute("create database if not exists {}".format(db)) conn.select_db(db) conn.execute( "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))") conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')") conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')") conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')") conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic)) conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')") conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')") conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')") def cleanup(db, topic): conn = taos.connect() conn.execute("drop topic if exists {}".format(topic)) conn.execute("drop database if exists {}".format(db)) if __name__ == '__main__': init_tmq_env("tmq_test", "tmq_test_topic") # init env consumer = Consumer( { "group.id": "tg2", "td.connect.user": "root", "td.connect.pass": "taosdata", "enable.auto.commit": "true", } ) consumer.subscribe(["tmq_test_topic"]) try: while True: res = consumer.poll(1) if not res: break err = res.error() if err is not None: raise err val = res.value() for block in val: print(block.fetchall()) finally: consumer.unsubscribe() consumer.close() cleanup("tmq_test", "tmq_test_topic")