subscription.py 1.3 KB
Newer Older
1 2
from taos.result import TaosResult
from .cinterface import *
weixin_48148422's avatar
weixin_48148422 已提交
3 4
from .error import *

5

6 7
class TaosSubscription(object):
    """TDengine subscription object"""
8

9
    def __init__(self, sub, with_callback = False):
weixin_48148422's avatar
weixin_48148422 已提交
10
        self._sub = sub
11
        self._with_callback = with_callback
weixin_48148422's avatar
weixin_48148422 已提交
12 13

    def consume(self):
14
        """Consume rows of a subscription"""
weixin_48148422's avatar
weixin_48148422 已提交
15 16
        if self._sub is None:
            raise OperationalError("Invalid use of consume")
17 18 19 20
        if self._with_callback:
            raise OperationalError("DONOT use consume method in an subscription with callback")
        result = taos_consume(self._sub)
        return TaosResult(result)
weixin_48148422's avatar
weixin_48148422 已提交
21

22
    def close(self, keepProgress=True):
23
        """Close the Subscription."""
weixin_48148422's avatar
weixin_48148422 已提交
24 25
        if self._sub is None:
            return False
26

27 28
        taos_unsubscribe(self._sub, keepProgress)
        self._sub = None
weixin_48148422's avatar
weixin_48148422 已提交
29
        return True
30 31 32 33
    
    def __del__(self):
        self.close()

weixin_48148422's avatar
weixin_48148422 已提交
34

35 36
if __name__ == "__main__":
    from .connection import TaosConnection
weixin_48148422's avatar
weixin_48148422 已提交
37

38
    conn = TaosConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
weixin_48148422's avatar
weixin_48148422 已提交
39 40 41 42

    # Generate a cursor object to run SQL commands
    sub = conn.subscribe(True, "test", "select * from meters;", 1000)

43
    for i in range(0, 10):
weixin_48148422's avatar
weixin_48148422 已提交
44 45 46 47 48
        data = sub.consume()
        for d in data:
            print(d)

    sub.close()
49
    conn.close()