1.4 KB
Newer Older
weixin_48148422's avatar
weixin_48148422 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
from .cinterface import CTaosInterface
from .error import *

class TDengineSubscription(object):
    """TDengine subscription object
    def __init__(self, sub):
        self._sub = sub

    def consume(self):
        """Consume rows of a subscription
        if self._sub is None:
            raise OperationalError("Invalid use of consume")
        result, fields = CTaosInterface.consume(self._sub)
        buffer = [[] for i in range(len(fields))]
        while True:
            block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
            if num_of_fields == 0: break
            for i in range(len(fields)):

        return list(map(tuple, zip(*buffer)))

    def close(self, keepProgress = True):
        """Close the Subscription.
        if self._sub is None:
            return False
        CTaosInterface.unsubscribe(self._sub, keepProgress)
        return True

if __name__ == '__main__':
    from .connection import TDengineConnection
    conn = TDengineConnection(host="", user="root", password="taosdata", database="test")

    # Generate a cursor object to run SQL commands
    sub = conn.subscribe(True, "test", "select * from meters;", 1000)

    for i in range(0,10):
        data = sub.consume()
        for d in data:
