sql.py 16.7 KB
Newer Older
1
###################################################################
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import sys
import os
import time
import datetime
S
Shuduo Sang 已提交
18
import inspect
L
liuyq-617 已提交
19 20
import psutil
import shutil
21
import pandas as pd
22 23
from util.log import *

24 25 26 27 28 29 30 31 32
def _parse_datetime(timestr):
    try:
        return datetime.datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S.%f')
    except ValueError:
        pass
    try:
        return datetime.datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S')
    except ValueError:
        pass
L
liuyq-617 已提交
33

34 35 36 37 38 39
class TDSql:
    def __init__(self):
        self.queryRows = 0
        self.queryCols = 0
        self.affectedRows = 0

40
    def init(self, cursor, log=False):
41 42
        self.cursor = cursor

S
Shuduo Sang 已提交
43
        if (log):
44 45
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            self.cursor.log(caller.filename + ".sql")
S
Shuduo Sang 已提交
46

47 48 49 50 51
    def close(self):
        self.cursor.close()

    def prepare(self):
        tdLog.info("prepare database:db")
L
liu0x54 已提交
52 53 54 55 56 57 58 59
        s = 'reset query cache'
        self.cursor.execute(s)
        s = 'drop database if exists db'
        self.cursor.execute(s)
        s = 'create database db'
        self.cursor.execute(s)
        s = 'use db'
        self.cursor.execute(s)
60 61 62 63 64 65 66 67

    def error(self, sql):
        expectErrNotOccured = True
        try:
            self.cursor.execute(sql)
        except BaseException:
            expectErrNotOccured = False
        if expectErrNotOccured:
68 69
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            tdLog.exit("%s(%d) failed: sql:%s, expect error not occured" % (caller.filename, caller.lineno, sql))
70
        else:
71 72 73
            self.queryRows = 0
            self.queryCols = 0
            self.queryResult = None
S
Shuduo Sang 已提交
74
            tdLog.info("sql:%s, expect error occured" % (sql))
75

J
jiajingbin 已提交
76
    def query(self, sql, row_tag=None):
77
        self.sql = sql
78 79 80 81 82 83 84 85
        try:
            self.cursor.execute(sql)
            self.queryResult = self.cursor.fetchall()
            self.queryRows = len(self.queryResult)
            self.queryCols = len(self.cursor.description)
        except Exception as e:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, sql, repr(e))
86 87
            tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
            raise Exception(repr(e))
J
jiajingbin 已提交
88 89
        if row_tag:
            return self.queryResult
90 91
        return self.queryRows

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    def getVariable(self, search_attr):
        '''
            get variable of search_attr access "show variables"
        '''
        try:
            sql = 'show variables'
            param_list = self.query(sql, row_tag=True)
            for param in param_list:
                if param[0] == search_attr:
                    return param[1], param_list
        except Exception as e:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, sql, repr(e))
            tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
            raise Exception(repr(e))

J
jiajingbin 已提交
108
    def getColNameList(self, sql, col_tag=None):
109 110 111
        self.sql = sql
        try:
            col_name_list = []
J
jiajingbin 已提交
112
            col_type_list = []
113 114 115 116
            self.cursor.execute(sql)
            self.queryCols = self.cursor.description
            for query_col in self.queryCols:
                col_name_list.append(query_col[0])
J
jiajingbin 已提交
117
                col_type_list.append(query_col[1])
118 119 120 121 122
        except Exception as e:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, sql, repr(e))
            tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
            raise Exception(repr(e))
J
jiajingbin 已提交
123 124
        if col_tag:
            return col_name_list, col_type_list
125 126
        return col_name_list

127 128 129 130 131 132 133 134 135
    def waitedQuery(self, sql, expectRows, timeout):
        tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectRows, timeout))
        self.sql = sql
        try:
            for i in range(timeout):
                self.cursor.execute(sql)
                self.queryResult = self.cursor.fetchall()
                self.queryRows = len(self.queryResult)
                self.queryCols = len(self.cursor.description)
D
dapan1121 已提交
136
                tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows))
137 138 139 140 141 142
                if self.queryRows >= expectRows:
                    return (self.queryRows, i)
                time.sleep(1)
        except Exception as e:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, sql, repr(e))
143 144
            tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
            raise Exception(repr(e))
145
        return (self.queryRows, timeout)
146

147 148 149 150 151 152 153
    def checkRows(self, expectRows):
        if self.queryRows == expectRows:
            tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows))
        else:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectRows)
            tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
154

155 156 157 158 159 160 161 162
    def checkCols(self, expectCols):
        if self.queryCols == expectCols:
            tdLog.info("sql:%s, queryCols:%d == expect:%d" % (self.sql, self.queryCols, expectCols))
        else:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, self.sql, self.queryCols, expectCols)
            tdLog.exit("%s(%d) failed: sql:%s, queryCols:%d != expect:%d" % args)

163 164
    def checkRowCol(self, row, col):
        caller = inspect.getframeinfo(inspect.stack()[2][0])
165
        if row < 0:
166 167
            args = (caller.filename, caller.lineno, self.sql, row)
            tdLog.exit("%s(%d) failed: sql:%s, row:%d is smaller than zero" % args)
168
        if col < 0:
169 170
            args = (caller.filename, caller.lineno, self.sql, row)
            tdLog.exit("%s(%d) failed: sql:%s, col:%d is smaller than zero" % args)
171
        if row > self.queryRows:
172 173
            args = (caller.filename, caller.lineno, self.sql, row, self.queryRows)
            tdLog.exit("%s(%d) failed: sql:%s, row:%d is larger than queryRows:%d" % args)
174
        if col > self.queryCols:
175 176
            args = (caller.filename, caller.lineno, self.sql, col, self.queryCols)
            tdLog.exit("%s(%d) failed: sql:%s, col:%d is larger than queryCols:%d" % args)
177

178 179
    def checkDataType(self, row, col, dataType):
        self.checkRowCol(row, col)
180 181
        return self.cursor.istype(col, dataType)

182
    def checkData(self, row, col, data):
183 184 185 186
        self.checkRowCol(row, col)
        if self.queryResult[row][col] != data:
            if self.cursor.istype(col, "TIMESTAMP"):
                # suppose user want to check nanosecond timestamp if a longer data passed
C
cpwu 已提交
187 188 189 190 191
                if isinstance(data, int) or isinstance(data, float):
                    if pd.to_datetime(self.queryResult[row][col]) == pd.to_datetime(data):
                        tdLog.info("sql:%s, row:%d col:%d data:%d == expect:%s" %
                            (self.sql, row, col, self.queryResult[row][col], data))
                elif (len(data) >= 28):
192 193 194 195
                    if pd.to_datetime(self.queryResult[row][col]) == pd.to_datetime(data):
                        tdLog.info("sql:%s, row:%d col:%d data:%d == expect:%s" %
                            (self.sql, row, col, self.queryResult[row][col], data))
                else:
196
                    if self.queryResult[row][col] == _parse_datetime(data):
197
                        tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" %
198 199 200
                            (self.sql, row, col, self.queryResult[row][col], data))
                return

201
            if str(self.queryResult[row][col]) == str(data):
P
Ping Xiao 已提交
202
                tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" %
203
                            (self.sql, row, col, self.queryResult[row][col], data))
P
Ping Xiao 已提交
204
                return
205
            elif isinstance(data, float) and abs(self.queryResult[row][col] - data) <= 0.000001:
P
Ping Xiao 已提交
206
                tdLog.info("sql:%s, row:%d col:%d data:%f == expect:%f" %
207
                            (self.sql, row, col, self.queryResult[row][col], data))
P
Ping Xiao 已提交
208 209 210 211
                return
            else:
                caller = inspect.getframeinfo(inspect.stack()[1][0])
                args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
212
                tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
sangshuduo's avatar
sangshuduo 已提交
213 214

        if data is None:
S
Shuduo Sang 已提交
215
            tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" %
216
                       (self.sql, row, col, self.queryResult[row][col], data))
217
        elif isinstance(data, str):
S
Shuduo Sang 已提交
218
            tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" %
219
                       (self.sql, row, col, self.queryResult[row][col], data))
sangshuduo's avatar
sangshuduo 已提交
220
        elif isinstance(data, datetime.date):
P
Ping Xiao 已提交
221
            tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" %
222
                       (self.sql, row, col, self.queryResult[row][col], data))
P
Ping Xiao 已提交
223
        elif isinstance(data, float):
S
Shuduo Sang 已提交
224
            tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" %
sangshuduo's avatar
sangshuduo 已提交
225
                       (self.sql, row, col, self.queryResult[row][col], data))
226
        else:
S
Shuduo Sang 已提交
227
            tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%d" %
228
                       (self.sql, row, col, self.queryResult[row][col], data))
229

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
    def checkDeviaRation(self, row, col, data, deviation=0.001):
        self.checkRowCol(row, col)
        if data is None:
            self.checkData(row, col, None)
            return
        caller = inspect.getframeinfo(inspect.stack()[1][0])
        if data is not None and len(self.queryResult)==0:
            tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{self.sql}, data:{data}, "
                       f"expect result is not None but it is")
        args = (
            caller.filename, caller.lineno, self.sql, data, type(data),
            deviation, type(deviation), self.queryResult[row][col], type(self.queryResult[row][col])
        )

        if not(isinstance(data,int) or isinstance(data, float)):
            tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, data:{args[3]}, "
                       f"expect type: int or float, actual type: {args[4]}")
        if not(isinstance(deviation,int) or isinstance(deviation, float)) or  type(data)==type(True):
            tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, deviation:{args[5]}, "
                       f"expect type: int or float, actual type: {args[6]}")
        if not(isinstance(self.queryResult[row][col], int) or isinstance(self.queryResult[row][col], float)):
            tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, result:{args[7]}, "
                       f"expect type: int or float, actual type: {args[8]}")

C
cpwu 已提交
254 255 256 257
        if data == 0:
            devia = abs(self.queryResult[row][col])
        else:
            devia = abs((data - self.queryResult[row][col])/data)
258
        if devia <= deviation:
C
cpwu 已提交
259
            tdLog.info(f"sql:{args[2]}, row:{row}, col:{col}, result data:{args[7]}, expect data:{args[3]}, "
260 261
                       f"actual deviation:{devia} <= expect deviation:{args[5]}")
        else:
C
cpwu 已提交
262 263
            tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, row:{row}, col:{col}, "
                       f"result data:{args[7]}, expect data:{args[3]},"
C
cpwu 已提交
264
                       f"actual deviation:{devia} > expect deviation:{args[5]}")
265 266
        pass

267
    def getData(self, row, col):
268
        self.checkRowCol(row, col)
269 270
        return self.queryResult[row][col]

271 272 273 274 275 276 277 278 279 280 281 282 283
    def getResult(self, sql):
        self.sql = sql
        try:
            self.cursor.execute(sql)
            self.queryResult = self.cursor.fetchall()
        except Exception as e:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, sql, repr(e))
            tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
            raise Exception(repr(e))
        return self.queryResult

        
284 285 286 287 288 289 290 291 292 293
    def executeTimes(self, sql, times):
        for i in range(times):
            try:
                return self.cursor.execute(sql)
            except BaseException:
                time.sleep(1)
                continue

    def execute(self, sql):
        self.sql = sql
294 295 296 297 298
        try:
            self.affectedRows = self.cursor.execute(sql)
        except Exception as e:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, sql, repr(e))
299 300
            tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
            raise Exception(repr(e))
301 302 303 304
        return self.affectedRows

    def checkAffectedRows(self, expectAffectedRows):
        if self.affectedRows != expectAffectedRows:
305 306 307 308 309
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, self.sql, self.affectedRows, expectAffectedRows)
            tdLog.exit("%s(%d) failed: sql:%s, affectedRows:%d != expect:%d" % args)

        tdLog.info("sql:%s, affectedRows:%d == expect:%d" % (self.sql, self.affectedRows, expectAffectedRows))
310

311 312 313 314 315 316 317 318
    def checkColNameList(self, col_name_list, expect_col_name_list):
        if col_name_list == expect_col_name_list:
            tdLog.info("sql:%s, col_name_list:%s == expect_col_name_list:%s" % (self.sql, col_name_list, expect_col_name_list))
        else:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, self.sql, col_name_list, expect_col_name_list)
            tdLog.exit("%s(%d) failed: sql:%s, col_name_list:%s != expect_col_name_list:%s" % args)

J
jiajingbin 已提交
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
    def checkEqual(self, elm, expect_elm):
        if elm == expect_elm:
            tdLog.info("sql:%s, elm:%s == expect_elm:%s" % (self.sql, elm, expect_elm))
        else:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
            tdLog.exit("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)

    def checkNotEqual(self, elm, expect_elm):
        if elm != expect_elm:
            tdLog.info("sql:%s, elm:%s != expect_elm:%s" % (self.sql, elm, expect_elm))
        else:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
            tdLog.exit("%s(%d) failed: sql:%s, elm:%s == expect_elm:%s" % args)

J
save  
jiajingbin 已提交
335 336 337 338 339 340 341 342
    def checkIn(self, sub, res):
        if sub in res:
            tdLog.info("sql:%s, sub:%s in result:%s" % (self.sql, sub, res))
        else:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            args = (caller.filename, caller.lineno, self.sql, sub, res)
            tdLog.exit("%s(%d) failed: sql:%s, sub:%s not in result:%s" % args)

L
liuyq-617 已提交
343
    def taosdStatus(self, state):
L
liuyq-617 已提交
344
        tdLog.sleep(5)
L
liuyq-617 已提交
345 346
        pstate = 0
        for i in range(30):
L
liuyq-617 已提交
347
            pstate = 0
L
liuyq-617 已提交
348 349
            pl = psutil.pids()
            for pid in pl:
L
liuyq-617 已提交
350 351 352 353 354 355 356
                try:
                    if psutil.Process(pid).name() == 'taosd':
                        print('have already started')
                        pstate = 1
                        break
                except psutil.NoSuchProcess:
                    pass
357 358 359
            if pstate == state :break
            if state or pstate:
                tdLog.sleep(1)
L
liuyq-617 已提交
360 361 362
                continue
            pstate = 0
            break
363

L
liuyq-617 已提交
364
        args=(pstate,state)
L
liuyq-617 已提交
365
        if pstate == state:
L
liuyq-617 已提交
366
            tdLog.info("taosd state is %d == expect:%d" %args)
L
liuyq-617 已提交
367
        else:
L
liuyq-617 已提交
368
            tdLog.exit("taosd state is %d != expect:%d" %args)
L
liuyq-617 已提交
369 370 371 372 373 374
        pass

    def haveFile(self, dir, state):
        if os.path.exists(dir) and os.path.isdir(dir):
            if not os.listdir(dir):
                if state :
L
liuyq-617 已提交
375
                    tdLog.exit("dir: %s is empty, expect: not empty" %dir)
L
liuyq-617 已提交
376
                else:
L
liuyq-617 已提交
377
                    tdLog.info("dir: %s is empty, expect: empty" %dir)
378
            else:
L
liuyq-617 已提交
379
                if state :
380
                    tdLog.info("dir: %s is not empty, expect: not empty" %dir)
L
liuyq-617 已提交
381
                else:
382
                    tdLog.exit("dir: %s is not empty, expect: empty" %dir)
L
liuyq-617 已提交
383
        else:
L
liuyq-617 已提交
384
            tdLog.exit("dir: %s doesn't exist" %dir)
L
liuyq-617 已提交
385 386 387
    def createDir(self, dir):
        if os.path.exists(dir):
            shutil.rmtree(dir)
L
liuyq-617 已提交
388
            tdLog.info("dir: %s is removed" %dir)
L
liuyq-617 已提交
389
        os.makedirs( dir, 755 )
L
liuyq-617 已提交
390
        tdLog.info("dir: %s is created" %dir)
L
liuyq-617 已提交
391
        pass
392

393
tdSql = TDSql()