connection.py 5.5 KB
Newer Older
1 2 3 4 5 6 7 8
# encoding:UTF-8
from types import FunctionType
from .cinterface import *
from .cursor import TaosCursor
from .subscription import TaosSubscription
from .statement import TaosStmt
from .stream import TaosStream
from .result import *
S
slguan 已提交
9

10

11 12
class TaosConnection(object):
    """TDengine connection object"""
13

S
slguan 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27
    def __init__(self, *args, **kwargs):
        self._conn = None
        self._host = None
        self._user = "root"
        self._password = "taosdata"
        self._database = None
        self._port = 0
        self._config = None
        self._chandle = None

        self.config(**kwargs)

    def config(self, **kwargs):
        # host
28 29
        if "host" in kwargs:
            self._host = kwargs["host"]
S
slguan 已提交
30 31

        # user
32 33
        if "user" in kwargs:
            self._user = kwargs["user"]
S
slguan 已提交
34 35

        # password
36 37
        if "password" in kwargs:
            self._password = kwargs["password"]
38

S
slguan 已提交
39
        # database
40 41
        if "database" in kwargs:
            self._database = kwargs["database"]
S
slguan 已提交
42 43

        # port
44 45
        if "port" in kwargs:
            self._port = kwargs["port"]
S
slguan 已提交
46 47

        # config
48 49
        if "config" in kwargs:
            self._config = kwargs["config"]
S
slguan 已提交
50 51

        self._chandle = CTaosInterface(self._config)
52
        self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
S
slguan 已提交
53 54

    def close(self):
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
        """Close current connection."""
        if self._conn:
            taos_close(self._conn)
            self._conn = None

    @property
    def client_info(self):
        # type: () -> str
        return taos_get_client_info()

    @property
    def server_info(self):
        # type: () -> str
        return taos_get_server_info(self._conn)

    def select_db(self, database):
        # type: (str) -> None
        taos_select_db(self._conn, database)

    def execute(self, sql):
        # type: (str) -> None
        """Simplely execute sql ignoring the results"""
        res = taos_query(self._conn, sql)
        taos_free_result(res)

    def query(self, sql):
        # type: (str) -> TaosResult
        result = taos_query(self._conn, sql)
        return TaosResult(result, True, self)

    def query_a(self, sql, callback, param):
        # type: (str, async_query_callback_type, c_void_p) -> None
        """Asynchronously query a sql with callback function"""
        taos_query_a(self._conn, sql, callback, param)

    def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
        # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
        """Create a subscription."""
weixin_48148422's avatar
weixin_48148422 已提交
93 94
        if self._conn is None:
            return None
95 96
        sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
        return TaosSubscription(sub, callback != None)
weixin_48148422's avatar
weixin_48148422 已提交
97

98 99
    def statement(self, sql=None):
        # type: (str | None) -> TaosStmt
100 101
        if self._conn is None:
            return None
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
        stmt = taos_stmt_init(self._conn)
        if sql != None:
            taos_stmt_prepare(stmt, sql)

        return TaosStmt(stmt)

    def load_table_info(self, tables):
        # type: (str) -> None
        taos_load_table_info(self._conn, tables)

    def stream(self, sql, callback, stime=0, param=None, callback2=None):
        # type: (str, Callable[[Any, TaosResult, TaosRows], None], int, Any, c_void_p) -> TaosStream
        # cb = cast(callback, stream_callback_type)
        # ref = byref(cb)

        stream = taos_open_stream(self._conn, sql, callback, stime, param, callback2)
        return TaosStream(stream)

    def insert_lines(self, lines):
        # type: (list[str]) -> None
        """Line protocol and schemaless support

        ## Example

        ```python
        import taos
        conn = taos.connect()
        conn.exec("drop database if exists test")
        conn.select_db("test")
        lines = [
            'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
        ]
        conn.insert_lines(lines)
        ```

        ## Exception

        ```python
        try:
            conn.insert_lines(lines)
        except SchemalessError as err:
            print(err)
        ```
S
slguan 已提交
145
        """
146 147
        return taos_insert_lines(self._conn, lines)

148 149 150 151 152 153 154 155 156
    def insert_telnet_lines(self, lines):
        """OpenTSDB telnet style API format support

        ## Example
        cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"

        """
        return taos_insert_telnet_lines(self._conn, lines)

157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
    def insert_json_payload(self, payload):
        """OpenTSDB HTTP JSON format support

        ## Example
        "{
            "metric": "cpu_load_0",
            "timestamp": 1626006833610123,
            "value": 55.5,
            "tags":
                {
                    "host": "ubuntu",
                    "interface": "eth0",
                    "Id": "tb0"
                }
        }"

        """
        return taos_insert_json_payload(self._conn, payload)

176 177 178 179
    def cursor(self):
        # type: () -> TaosCursor
        """Return a new Cursor object using the connection."""
        return TaosCursor(self)
S
slguan 已提交
180 181 182 183 184 185 186 187 188

    def commit(self):
        """Commit any pending transaction to the database.

        Since TDengine do not support transactions, the implement is void functionality.
        """
        pass

    def rollback(self):
189
        """Void functionality"""
S
slguan 已提交
190 191 192
        pass

    def clear_result_set(self):
193
        """Clear unused result set on this connection."""
194
        pass
S
slguan 已提交
195

196 197 198
    def __del__(self):
        self.close()

199

S
slguan 已提交
200
if __name__ == "__main__":
201
    conn = TaosConnection()
S
slguan 已提交
202
    conn.close()
203
    print("Hello world")