performance_schema.py 12.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-


from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
import threading
from taos.tmq import Consumer

class TDTestCase:
    def init(self, conn, logSql, replicaVar=1):
        self.replicaVar = int(replicaVar)
        tdLog.debug("start to execute %s" % __file__)
        tdSql.init(conn.cursor())
        self.conn=conn
        self.setsql = TDSetSql()
        self.dbname = 'db'
        self.stbname = 'stb'
        self.binary_length = 20 # the length of binary for column_dict
        self.nchar_length = 20  # the length of nchar for column_dict
        self.ts = 1537146000000
        self.user = "root"
        self.groupName = "tg2"
        self.topicName = "tmq_test_topic"
        self.appName = "python3.10" #todo need to set to fixed name
        self.ipAdd= "127.0.0.1";
        self.transTestDBName = "dbwithvgroup"
        self.lastTranAction = "action:40 code:0x111(Action in progress) msgType:create-vnode numOfEps:1 inUse:0 ep:0-localhost:6030 "
        self.column_dict = {
            'ts'  : 'timestamp',                        #8
            'col1': 'tinyint',                          #1
            'col2': 'smallint',                         #2
            'col3': 'int',                              #4                 
            'col4': 'bigint',                           #8
            'col5': 'tinyint unsigned',                 #1
            'col6': 'smallint unsigned',                #2
            'col7': 'int unsigned',                     #4
            'col8': 'bigint unsigned',                  #8
            'col9': 'float',                            #4
            'col10': 'double',                          #8
            'col11': 'bool',                            #1
            'col12': f'binary({self.binary_length})',   #20 + 2
            'col13': f'nchar({self.nchar_length})'      #20 + 2
        }                                               #95
        self.tbnum = 20
        self.rowNum = 10
        self.tag_dict = {
            't0':'int'                                  #4
        }
        self.tag_values = [
            f'1'
            ]
        self.binary_str = 'taosdata'
        self.nchar_str = '涛思数据'
    
    def insert_data(self,column_dict,tbname,row_num):
        insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str)
        for i in range(row_num):
            insert_list = []
            self.setsql.insert_values(column_dict,i,insert_sql,insert_list,self.ts)

    def prepare_data(self):
        tdSql.execute(f"create database if not exists {self.dbname} vgroups 2")                                         #1 query
        tdSql.execute(f'use {self.dbname}')                                                                             #1 query
78
        tdsql.execute(f"alter database {self.dbname} wal_retention_period 3600")
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))                   #1 query
        
        for i in range(self.tbnum):                                                                                     #self.tbnum query
            tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})")           #self.tbnum query
            self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum)                                        #self.stbname*self.rowNum query
        
        for i in range(self.tbnum):
            tdSql.execute(f"select * from {self.stbname}_{i}")                                                          #self.tbnum query

    class myThread (threading.Thread):
        def __init__(self, obj):
            threading.Thread.__init__(self)
            self.obj = obj
        def run(self):
            tdSqlTran = TDSql()
            tdSqlTran.init(self.obj.conn.cursor())
95
            tdSqlTran.execute(f"create database if not exists %s vgroups 20 wal_retention_period 3600"%(self.obj.transTestDBName))
96 97 98 99
            tdSqlTran.execute(f"DROP DATABASE %s"%(self.obj.transTestDBName))

    def init_tmq_env(self, db, topic):
        self.conn.execute("drop topic if exists {}".format(topic))
100
        self.conn.execute("create database if not exists {} wal_retention_period 3600".format(db))
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
        self.conn.select_db(db)
        self.conn.execute(
            "create stable if not exists stb_sub (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
        self.conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb_sub".format(topic))

    def count_check(self):
        sleep(1) #performance table delay
        tdSql.query('select * from performance_schema.perf_apps')                                                            #1 query
        rowIndex = 0 #for debug
        tdSql.checkRows(1)
        tdSql.checkNotEqual(tdSql.queryResult[rowIndex][0], 0)                              #column 0:app_id
        tdSql.checkEqual(tdSql.queryResult[rowIndex][1],self.ipAdd)                         #column 1:ip
        tdSql.checkNotEqual(tdSql.queryResult[rowIndex][2], 0)                              #column 2:pid               # show child process pid of this running python program, this is maybe related to test framework 
        P = psutil.Process()
        tdLog.info(P)
        chi = P.children()                                           
        tdLog.info(chi)                                                                                                 # child process pid is not able to get, chi is empty here
        tdSql.checkEqual(tdSql.queryResult[rowIndex][3],self.appName)                       #column 3:name              
        tdSql.checkNotEqual(tdSql.queryResult[rowIndex][4], 0)                              #column 4:start_time
        tdSql.checkEqual(tdSql.queryResult[rowIndex][5], 0)                                 #column 5:insert_req       # zero ???
        tdSql.checkEqual(tdSql.queryResult[rowIndex][6],self.tbnum * self.rowNum)           #column 6:insert_row
        tdSql.checkEqual(tdSql.queryResult[rowIndex][7], 0)                                 #column 7:insert_time       #zeor???  
        tdSql.checkEqual(tdSql.queryResult[rowIndex][8],self.tbnum * self.rowNum * 134)     #column 8:insert_bytes     # 134 bytes ???
        tdSql.checkEqual(tdSql.queryResult[rowIndex][9], 0)                                 #column 9:fetch_bytes       # zero ???
        tdSql.checkNotEqual(tdSql.queryResult[rowIndex][10], 0)                             #column 10:query_time       
        tdSql.checkEqual(tdSql.queryResult[rowIndex][11],0)                                 #column 11:slow_query       
        tdSql.checkEqual(tdSql.queryResult[rowIndex][12],self.tbnum * self.rowNum + self.tbnum * 2 + 4)                               #column 11:total_req
        tdSql.checkEqual(tdSql.queryResult[rowIndex][13], 1)                                #column 13:current_req
        tdSql.checkNotEqual(tdSql.queryResult[rowIndex][14], 0)                             #column 14:last_access


        tdSql.query('select * from performance_schema.perf_connections')
        tdSql.checkRows(2)
        for i in range(2):
            if tdSql.queryResult[i][2]== self.appName :
                tdSql.checkNotEqual(tdSql.queryResult[i][0],0)                  #column 0:conn_id
                tdSql.checkEqual(tdSql.queryResult[i][1],self.user)             #column 1:user
                tdSql.checkEqual(tdSql.queryResult[i][2],self.appName)          #column 2:app
                tdSql.checkNotEqual(tdSql.queryResult[i][3],0)                  #column 3:pid
                tdSql.checkNotEqual(tdSql.queryResult[i][4],0)                  #column 4:end_point
                tdSql.checkNotEqual(tdSql.queryResult[i][5],0)                  #column 5:login_time
                tdSql.checkNotEqual(tdSql.queryResult[i][6],0)                  #column 6:last_access
                self.connId = tdSql.queryResult[i][0]
            
        self.init_tmq_env(self.dbname, self.topicName)

        consumer = Consumer(
            {
                "group.id": self.groupName,
                "td.connect.user": self.user,
                "td.connect.pass": "taosdata",
                "td.connect.ip": self.ipAdd,
            }
        )
        consumer.subscribe([self.topicName])
        
        tdSql.query('select * from performance_schema.perf_consumers')
        tdSql.checkRows(1)
        tdSql.checkNotEqual(tdSql.queryResult[0][0],0)                  #consumer_id
        tdSql.checkEqual(tdSql.queryResult[0][1],self.groupName)        #consumer_group
        tdSql.checkNotEqual(tdSql.queryResult[0][2],0)                  #client_id
        tdSql.checkEqual(tdSql.queryResult[0][3],"ready")               #status
        tdSql.checkEqual(tdSql.queryResult[0][4],self.topicName)        #topics
        tdSql.checkNotEqual(tdSql.queryResult[0][5],0)                  #up_time
        tdSql.checkNotEqual(tdSql.queryResult[0][6],0)                  #subscribe_time
        tdSql.checkNotEqual(tdSql.queryResult[0][7],0)                  #rebalance_time

        sleep(3) #performance_schema delay, wait for last query

        tdSql.query('select * from performance_schema.perf_queries')
        tdSql.checkEqual(tdSql.queryResult[0][12],"select * from performance_schema.perf_consumers") #sql
        tdSql.checkNotEqual(tdSql.queryResult[0][0],0)                  #kill_id
        tdSql.checkNotEqual(tdSql.queryResult[0][1],0)                  #query_id
        tdSql.checkEqual(tdSql.queryResult[0][2],self.connId)           #conn_id
        tdSql.checkEqual(tdSql.queryResult[0][3],self.appName)          #app
        tdSql.checkNotEqual(tdSql.queryResult[0][4],0)                  #pid
        tdSql.checkEqual(tdSql.queryResult[0][5],self.user)             #user
        tdSql.checkNotEqual(tdSql.queryResult[0][6],0)                  #end_point
        tdSql.checkNotEqual(tdSql.queryResult[0][7],0)                  #create_time
        tdSql.checkNotEqual(tdSql.queryResult[0][8],0)                  #exec_usec
        tdSql.checkEqual(tdSql.queryResult[0][9],0)                     #stable_query
        tdSql.checkEqual(tdSql.queryResult[0][10],1)                    #sub_num
        tdSql.checkEqual(tdSql.queryResult[0][11],"243:SUCCEED")        #sub_status   
        
        t1 = self.myThread(self) 
        t1.start()                           
        
        sleep(0.5) # there is delay
        tdSql.query('select * from performance_schema.perf_trans')
        tdSql.checkNotEqual(tdSql.queryResult[0][0],0)                  #id
        tdSql.checkNotEqual(tdSql.queryResult[0][1],0)                  #create_time
        tdSql.checkEqual(tdSql.queryResult[0][2],"redoAction")          #stage
        tdSql.checkEqual(tdSql.queryResult[0][3],"create-db")           #oper
        tdSql.checkEqual(tdSql.queryResult[0][4],self.transTestDBName)  #db
        tdSql.checkNotEqual(tdSql.queryResult[0][5],0)                  #stable
        tdSql.checkEqual(tdSql.queryResult[0][6],0)                     #failed_times
        tdSql.checkNotEqual(tdSql.queryResult[0][7],0)                  #last_exec_time
        tdSql.checkEqual(tdSql.queryResult[0][8],self.lastTranAction)   #last_action_info

        t1.join()

        tdSql.query('select * from performance_schema.perf_apps')
        tdSql.checkNotEqual(tdSql.queryResult[rowIndex][11],0)             #column 11:slow_query  at least one slow query: create db.

    def run(self):
        self.prepare_data()
        self.count_check()

    def stop(self):
        tdSql.close()
        tdLog.success("%s successfully executed" % __file__)

tdCases.addWindows(__file__, TDTestCase())
214
tdCases.addLinux(__file__, TDTestCase())