cursor.py 9.7 KB
Newer Older
S
slguan 已提交
1 2
from .cinterface import CTaosInterface
from .error import *
3
from .constants import FieldType
4
import threading
S
slguan 已提交
5

6
# querySeqNum = 0
7

S
slguan 已提交
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
class TDengineCursor(object):
    """Database cursor which is used to manage the context of a fetch operation.

    Attributes:
        .description: Read-only attribute consists of 7-item sequences:

            > name (mondatory)
            > type_code (mondatory)
            > display_size
            > internal_size
            > precision
            > scale
            > null_ok

            This attribute will be None for operations that do not return rows or
            if the cursor has not had an operation invoked via the .execute*() method yet.

        .rowcount:This read-only attribute specifies the number of rows that the last
26
            .execute*() produced (for DQL statements like SELECT) or affected
S
slguan 已提交
27 28 29
    """

    def __init__(self, connection=None):
30
        self._description = []
S
slguan 已提交
31 32 33 34 35 36 37
        self._rowcount = -1
        self._connection = None
        self._result = None
        self._fields = None
        self._block = None
        self._block_rows = -1
        self._block_iter = 0
38
        self._affected_rows = 0
S
Shuduo Sang 已提交
39
        self._logfile = ""
40
        self._threadId = threading.get_ident()
S
slguan 已提交
41 42 43 44 45 46 47 48 49 50 51 52

        if connection is not None:
            self._connection = connection

    def __iter__(self):
        return self

    def __next__(self):
        if self._result is None or self._fields is None:
            raise OperationalError("Invalid use of fetch iterator")

        if self._block_rows <= self._block_iter:
53
            block, self._block_rows = CTaosInterface.fetchRow(
54
                self._result, self._fields)
S
slguan 已提交
55 56 57 58 59
            if self._block_rows == 0:
                raise StopIteration
            self._block = list(map(tuple, zip(*block)))
            self._block_iter = 0

60
        data = self._block[self._block_iter]
S
slguan 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
        self._block_iter += 1

        return data

    @property
    def description(self):
        """Return the description of the object.
        """
        return self._description

    @property
    def rowcount(self):
        """Return the rowcount of the object
        """
        return self._rowcount

77 78 79 80 81 82
    @property
    def affected_rows(self):
        """Return the rowcount of insertion
        """
        return self._affected_rows

S
slguan 已提交
83 84 85 86 87 88 89
    def callproc(self, procname, *args):
        """Call a stored database procedure with the given name.

        Void functionality since no stored procedures.
        """
        pass

S
Shuduo Sang 已提交
90 91 92
    def log(self, logfile):
        self._logfile = logfile

S
slguan 已提交
93 94 95 96 97
    def close(self):
        """Close the cursor.
        """
        if self._connection is None:
            return False
98

S
slguan 已提交
99 100 101 102 103 104 105 106
        self._reset_result()
        self._connection = None

        return True

    def execute(self, operation, params=None):
        """Prepare and execute a database operation (query or command).
        """
107 108 109 110 111
        # if threading.get_ident() != self._threadId:
        #     info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
        #     raise OperationalError(info)
            # print(info)
            # return None
112

S
slguan 已提交
113 114 115 116 117 118
        if not operation:
            return None

        if not self._connection:
            # TODO : change the exception raised here
            raise ProgrammingError("Cursor is not connected")
119

S
slguan 已提交
120 121 122 123 124
        self._reset_result()

        stmt = operation
        if params is not None:
            pass
125 126 127 128 129

        # global querySeqNum
        # querySeqNum += 1
        # localSeqNum = querySeqNum # avoid raice condition
        # print("   >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
130
        self._result = CTaosInterface.query(self._connection._conn, stmt)
131
        # print("   << Query ({}) Exec Done".format(localSeqNum))
S
Shuduo Sang 已提交
132 133 134 135
        if (self._logfile):
            with open(self._logfile, "a") as logfile:
                logfile.write("%s;\n" % operation)

T
Tao Liu 已提交
136 137
        errno = CTaosInterface.libtaos.taos_errno(self._result)
        if errno == 0:
138
            if CTaosInterface.fieldsCount(self._result) == 0:
139
                self._affected_rows += CTaosInterface.affectedRows(
140 141
                    self._result )
                return CTaosInterface.affectedRows(self._result )
S
slguan 已提交
142
            else:
143 144
                self._fields = CTaosInterface.useResult(
                    self._result)
S
slguan 已提交
145 146
                return self._handle_result()
        else:
147 148
            raise ProgrammingError(
                CTaosInterface.errStr(
149
                    self._result), errno)
S
slguan 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163

    def executemany(self, operation, seq_of_parameters):
        """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
        """
        pass

    def fetchone(self):
        """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
        """
        pass

    def fetchmany(self):
        pass

164 165 166 167 168 169 170
    def istype(self, col, dataType):
        if (dataType.upper() == "BOOL"):
            if (self._description[col][1] == FieldType.C_BOOL):
                return True
        if (dataType.upper() == "TINYINT"):
            if (self._description[col][1] == FieldType.C_TINYINT):
                return True
171 172 173 174 175 176 177 178 179
        if (dataType.upper() == "TINYINT UNSIGNED"):
            if (self._description[col][1] == FieldType.C_TINYINT_UNSIGNED):
                return True
        if (dataType.upper() == "SMALLINT"):
            if (self._description[col][1] == FieldType.C_SMALLINT):
                return True
        if (dataType.upper() == "SMALLINT UNSIGNED"):
            if (self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED):
                return True
180 181 182
        if (dataType.upper() == "INT"):
            if (self._description[col][1] == FieldType.C_INT):
                return True
183 184 185
        if (dataType.upper() == "INT UNSIGNED"):
            if (self._description[col][1] == FieldType.C_INT_UNSIGNED):
                return True
186
        if (dataType.upper() == "BIGINT"):
187 188 189 190
            if (self._description[col][1] == FieldType.C_BIGINT):
                return True
        if (dataType.upper() == "BIGINT UNSIGNED"):
            if (self._description[col][1] == FieldType.C_BIGINT_UNSIGNED):
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
                return True
        if (dataType.upper() == "FLOAT"):
            if (self._description[col][1] == FieldType.C_FLOAT):
                return True
        if (dataType.upper() == "DOUBLE"):
            if (self._description[col][1] == FieldType.C_DOUBLE):
                return True
        if (dataType.upper() == "BINARY"):
            if (self._description[col][1] == FieldType.C_BINARY):
                return True
        if (dataType.upper() == "TIMESTAMP"):
            if (self._description[col][1] == FieldType.C_TIMESTAMP):
                return True
        if (dataType.upper() == "NCHAR"):
            if (self._description[col][1] == FieldType.C_NCHAR):
                return True

        return False

210
    def fetchall_row(self):
S
slguan 已提交
211 212 213 214
        """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
        """
        if self._result is None or self._fields is None:
            raise OperationalError("Invalid use of fetchall")
215

S
slguan 已提交
216 217 218
        buffer = [[] for i in range(len(self._fields))]
        self._rowcount = 0
        while True:
219
            block, num_of_fields = CTaosInterface.fetchRow(self._result, self._fields)
B
Bomin Zhang 已提交
220 221 222
            errno = CTaosInterface.libtaos.taos_errno(self._result)
            if errno != 0:
                raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
223 224
            if num_of_fields == 0:
                break
S
slguan 已提交
225 226 227
            self._rowcount += num_of_fields
            for i in range(len(self._fields)):
                buffer[i].extend(block[i])
228
        return list(map(tuple, zip(*buffer)))
S
slguan 已提交
229

230
    def fetchall(self):
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
        if self._result is None or self._fields is None:
            raise OperationalError("Invalid use of fetchall")

        buffer = [[] for i in range(len(self._fields))]
        self._rowcount = 0
        while True:
            block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
            errno = CTaosInterface.libtaos.taos_errno(self._result)
            if errno != 0:
                raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
            if num_of_fields == 0: break
            self._rowcount += num_of_fields
            for i in range(len(self._fields)):
                buffer[i].extend(block[i])
        return list(map(tuple, zip(*buffer)))
S
slguan 已提交
246 247 248 249 250 251 252 253 254 255 256 257 258 259
    def nextset(self):
        """
        """
        pass

    def setinputsize(self, sizes):
        pass

    def setutputsize(self, size, column=None):
        pass

    def _reset_result(self):
        """Reset the result to unused version.
        """
260
        self._description = []
S
slguan 已提交
261
        self._rowcount = -1
T
Tao Liu 已提交
262 263
        if self._result is not None:
            CTaosInterface.freeResult(self._result)
S
slguan 已提交
264 265 266 267 268
        self._result = None
        self._fields = None
        self._block = None
        self._block_rows = -1
        self._block_iter = 0
269
        self._affected_rows = 0
270

S
slguan 已提交
271 272 273
    def _handle_result(self):
        """Handle the return result from query.
        """
274 275 276 277 278
        # if threading.get_ident() != self._threadId:
        #     info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
        #     raise OperationalError(info)
            # print(info)
            # return None
279

S
slguan 已提交
280 281
        self._description = []
        for ele in self._fields:
282 283 284
            self._description.append(
                (ele['name'], ele['type'], None, None, None, None, False))

285
        return self._result
T
Tao Liu 已提交
286