subscription.py 1.4 KB
Newer Older
1 2 3
from .cinterface import CTaosInterface
from .error import *

4

5 6 7
class TDengineSubscription(object):
    """TDengine subscription object
    """
8

9 10 11 12 13 14 15 16
    def __init__(self, sub):
        self._sub = sub

    def consume(self):
        """Consume rows of a subscription
        """
        if self._sub is None:
            raise OperationalError("Invalid use of consume")
17

18 19 20 21
        result, fields = CTaosInterface.consume(self._sub)
        buffer = [[] for i in range(len(fields))]
        while True:
            block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
22 23
            if num_of_fields == 0:
                break
24 25 26 27 28 29
            for i in range(len(fields)):
                buffer[i].extend(block[i])

        self.fields = fields
        return list(map(tuple, zip(*buffer)))

30
    def close(self, keepProgress=True):
31 32 33 34 35 36 37 38 39 40 41
        """Close the Subscription.
        """
        if self._sub is None:
            return False

        CTaosInterface.unsubscribe(self._sub, keepProgress)
        return True


if __name__ == '__main__':
    from .connection import TDengineConnection
42 43 44 45 46
    conn = TDengineConnection(
        host="127.0.0.1",
        user="root",
        password="taosdata",
        database="test")
47 48 49 50

    # Generate a cursor object to run SQL commands
    sub = conn.subscribe(True, "test", "select * from meters;", 1000)

51
    for i in range(0, 10):
52 53 54 55 56
        data = sub.consume()
        for d in data:
            print(d)

    sub.close()
57
    conn.close()