connection.py 5.6 KB
Newer Older
1 2 3 4 5 6 7 8
# encoding:UTF-8
from types import FunctionType
from .cinterface import *
from .cursor import TaosCursor
from .subscription import TaosSubscription
from .statement import TaosStmt
from .stream import TaosStream
from .result import *
S
slguan 已提交
9

10

11 12
class TaosConnection(object):
    """TDengine connection object"""
13

S
slguan 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27
    def __init__(self, *args, **kwargs):
        self._conn = None
        self._host = None
        self._user = "root"
        self._password = "taosdata"
        self._database = None
        self._port = 0
        self._config = None
        self._chandle = None

        self.config(**kwargs)

    def config(self, **kwargs):
        # host
28 29
        if "host" in kwargs:
            self._host = kwargs["host"]
S
slguan 已提交
30 31

        # user
32 33
        if "user" in kwargs:
            self._user = kwargs["user"]
S
slguan 已提交
34 35

        # password
36 37
        if "password" in kwargs:
            self._password = kwargs["password"]
38

S
slguan 已提交
39
        # database
40 41
        if "database" in kwargs:
            self._database = kwargs["database"]
S
slguan 已提交
42 43

        # port
44 45
        if "port" in kwargs:
            self._port = kwargs["port"]
S
slguan 已提交
46 47

        # config
48 49
        if "config" in kwargs:
            self._config = kwargs["config"]
S
slguan 已提交
50 51

        self._chandle = CTaosInterface(self._config)
52
        self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
S
slguan 已提交
53 54

    def close(self):
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
        """Close current connection."""
        if self._conn:
            taos_close(self._conn)
            self._conn = None

    @property
    def client_info(self):
        # type: () -> str
        return taos_get_client_info()

    @property
    def server_info(self):
        # type: () -> str
        return taos_get_server_info(self._conn)

    def select_db(self, database):
        # type: (str) -> None
        taos_select_db(self._conn, database)

    def execute(self, sql):
        # type: (str) -> None
        """Simplely execute sql ignoring the results"""
        res = taos_query(self._conn, sql)
        taos_free_result(res)

    def query(self, sql):
        # type: (str) -> TaosResult
        result = taos_query(self._conn, sql)
        return TaosResult(result, True, self)

    def query_a(self, sql, callback, param):
        # type: (str, async_query_callback_type, c_void_p) -> None
        """Asynchronously query a sql with callback function"""
        taos_query_a(self._conn, sql, callback, param)

    def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
        # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
        """Create a subscription."""
weixin_48148422's avatar
weixin_48148422 已提交
93 94
        if self._conn is None:
            return None
95 96
        sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
        return TaosSubscription(sub, callback != None)
weixin_48148422's avatar
weixin_48148422 已提交
97

98 99
    def statement(self, sql=None):
        # type: (str | None) -> TaosStmt
100 101
        if self._conn is None:
            return None
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
        stmt = taos_stmt_init(self._conn)
        if sql != None:
            taos_stmt_prepare(stmt, sql)

        return TaosStmt(stmt)

    def load_table_info(self, tables):
        # type: (str) -> None
        taos_load_table_info(self._conn, tables)

    def stream(self, sql, callback, stime=0, param=None, callback2=None):
        # type: (str, Callable[[Any, TaosResult, TaosRows], None], int, Any, c_void_p) -> TaosStream
        # cb = cast(callback, stream_callback_type)
        # ref = byref(cb)

        stream = taos_open_stream(self._conn, sql, callback, stime, param, callback2)
        return TaosStream(stream)

120
    def schemaless_insert(self, lines, protocol):
121
        # type: (list[str]) -> None
122 123
        """
        1.Line protocol and schemaless support
124 125 126 127 128 129 130 131 132 133 134

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        lines = [
            'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
        ]
135
        conn.schemaless_insert(lines, 0)
136 137
        ```

138
        2.OpenTSDB telnet style API format support
139 140

        ## Example
141 142 143 144 145 146 147 148
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        lines = [
            'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
        ]
        conn.schemaless_insert(lines, 1)
149 150


151
        3.OpenTSDB HTTP JSON format support
152 153

        ## Example
154 155 156 157 158 159
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        payload = ['''
        {
160 161 162 163 164 165 166 167 168
            "metric": "cpu_load_0",
            "timestamp": 1626006833610123,
            "value": 55.5,
            "tags":
                {
                    "host": "ubuntu",
                    "interface": "eth0",
                    "Id": "tb0"
                }
169 170 171
        }
        ''']
        conn.schemaless_insert(lines, 2)
172

173
        """
174
        return taos_schemaless_insert(self._conn, lines, protocol)
175

176

177 178 179 180
    def cursor(self):
        # type: () -> TaosCursor
        """Return a new Cursor object using the connection."""
        return TaosCursor(self)
S
slguan 已提交
181 182 183 184 185 186 187 188 189

    def commit(self):
        """Commit any pending transaction to the database.

        Since TDengine do not support transactions, the implement is void functionality.
        """
        pass

    def rollback(self):
190
        """Void functionality"""
S
slguan 已提交
191 192 193
        pass

    def clear_result_set(self):
194
        """Clear unused result set on this connection."""
195
        pass
S
slguan 已提交
196

197 198 199
    def __del__(self):
        self.close()

200

S
slguan 已提交
201
if __name__ == "__main__":
202
    conn = TaosConnection()
S
slguan 已提交
203
    conn.close()
204
    print("Hello world")