subscription.py 1.4 KB
Newer Older
weixin_48148422's avatar
weixin_48148422 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
from .cinterface import CTaosInterface
from .error import *

class TDengineSubscription(object):
    """TDengine subscription object
    """
    def __init__(self, sub):
        self._sub = sub
    

    def consume(self):
        """Consume rows of a subscription
        """
        if self._sub is None:
            raise OperationalError("Invalid use of consume")
        
        result, fields = CTaosInterface.consume(self._sub)
        buffer = [[] for i in range(len(fields))]
        while True:
            block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
            if num_of_fields == 0: break
            for i in range(len(fields)):
                buffer[i].extend(block[i])

25
        self.fields = fields
weixin_48148422's avatar
weixin_48148422 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
        return list(map(tuple, zip(*buffer)))


    def close(self, keepProgress = True):
        """Close the Subscription.
        """
        if self._sub is None:
            return False
        
        CTaosInterface.unsubscribe(self._sub, keepProgress)
        return True


if __name__ == '__main__':
    from .connection import TDengineConnection
    conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test")

    # Generate a cursor object to run SQL commands
    sub = conn.subscribe(True, "test", "select * from meters;", 1000)

    for i in range(0,10):
        data = sub.consume()
        for d in data:
            print(d)

    sub.close()
    conn.close()