tmqAlterSchema.py 12.2 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34

from ntpath import join
import taos
import sys
import time
import socket
import os
import threading

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *

sys.path.append("./7-tmq")
from tmqCommon import *

class TDTestCase:
    def __init__(self):
        self.dnodes = 5
        self.mnodes = 3
        self.idIndex = 0
        self.roleIndex = 2
        self.mnodeStatusIndex = 3
        self.mnodeEpIndex = 1
        self.dnodeStatusIndex = 4
        self.mnodeCheckCnt    = 10
        self.host = socket.gethostname()
        self.startPort = 6030
        self.portStep = 100
        self.dnodeOfLeader = 0

35
    def init(self, conn, logSql, replicaVar=1):
36
        self.replicaVar = int(replicaVar)
P
plum-lihui 已提交
37 38 39
        tdLog.debug(f"start to excute {__file__}")
        tdSql.init(conn.cursor())
        #tdSql.init(conn.cursor(), logSql)  # output sql.txt file
G
Ganlin Zhao 已提交
40

P
plum-lihui 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
    def tmqCase1(self):
        tdLog.printNoPrefix("======== test case 1: topic: select * from stb, while consume, add column int-A/bianry-B/float-C, and then modify B, drop C")
        tdLog.printNoPrefix("add tag int-A/bianry-B/float-C, and then rename A, modify B, drop C, set t2")
        paraDict = {'dbName':     'db1',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}, {'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
                    'ctbPrefix':  'ctb',
                    'ctbStartIdx': 0,
                    'ctbNum':     1,
                    'rowsPerTbl': 10000,
                    'batchNum':   10,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'pollDelay':  10,
                    'showMsg':    1,
                    'showRow':    1}

        topicNameList = ['topic1']
        expectRowsList = []
G
Ganlin Zhao 已提交
65
        queryStringList = []
P
plum-lihui 已提交
66 67
        tmqCom.initConsumerTable()
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
68
        tdSql.execute("alter database %s wal_retention_period 3600" %(paraDict['dbName']))
P
plum-lihui 已提交
69 70 71 72 73 74 75
        tdLog.info("create stb")
        tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
        tdLog.info("create ctb")
        tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
        # tdLog.info("async insert data")
        # pThread = tmqCom.asyncInsertData(paraDict)
        tmqCom.insert_data_2(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
G
Ganlin Zhao 已提交
76

P
plum-lihui 已提交
77 78 79 80 81
        tdLog.info("create topics from stb with filter")
        queryStringList.append("select * from %s.%s" %(paraDict['dbName'], paraDict['stbName']))
        sqlString = "create topic %s as %s" %(topicNameList[0], queryStringList[0])
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)
G
Ganlin Zhao 已提交
82 83 84
        tdSql.query(queryStringList[0])
        expectRowsList.append(tdSql.getRows())

P
plum-lihui 已提交
85 86 87 88 89 90 91 92 93 94 95
        # init consume info, and start tmq_sim, then check consume result
        tdLog.info("insert consume info to consume processor")
        consumerId   = 0
        expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
        topicList    = topicNameList[0]
        ifcheckdata  = 1
        ifManualCommit = 1
        keyList      = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        dstFile = tmqCom.getResultFileByTaosShell(consumerId, queryStringList[0])
G
Ganlin Zhao 已提交
96

P
plum-lihui 已提交
97 98 99 100 101
        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])

        tdLog.info("wait the notify info of start consume, then alter schema")
        tmqCom.getStartConsumeNotifyFromTmqsim()
G
Ganlin Zhao 已提交
102 103

        # add column double-A/bianry-B/double-C, and then modify B, drop C
P
plum-lihui 已提交
104 105 106 107 108 109 110 111 112
        sqlString = "alter table %s.%s add column newc1 double"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s add column newc2 binary(16)"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s add column newc3 double"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s modify column newc2 binary(32)"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s drop column newc3"%(paraDict["dbName"],paraDict['stbName'])
G
Ganlin Zhao 已提交
113 114
        tdSql.execute(sqlString)
        # add tag double-A/bianry-B/double-C, and then rename A, modify B, drop C, set t1
P
plum-lihui 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
        sqlString = "alter table %s.%s add tag newt1 double"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s add tag newt2 binary(16)"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s add tag newt3 double"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s rename tag newt1 newt1n"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s modify tag newt2 binary(32)"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s drop tag newt3"%(paraDict["dbName"],paraDict['stbName'])
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s0 set tag newt2='new tag'"%(paraDict["dbName"],paraDict['ctbPrefix'])
        tdSql.execute(sqlString)

G
Ganlin Zhao 已提交
130 131
        tdLog.info("check the consume result")
        tdSql.query(queryStringList[0])
P
plum-lihui 已提交
132 133 134 135
        expectRowsList.append(tdSql.getRows())

        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
G
Ganlin Zhao 已提交
136

P
plum-lihui 已提交
137 138
        tdLog.info("expect consume rows: %d"%(expectRowsList[0]))
        tdLog.info("act consume rows: %d"%(resultList[0]))
G
Ganlin Zhao 已提交
139

P
plum-lihui 已提交
140 141 142 143
        if expectRowsList[0] != resultList[0]:
            tdLog.exit("0 tmq consume rows error!")

        tmqCom.checkTmqConsumeFileContent(consumerId, dstFile)
G
Ganlin Zhao 已提交
144

P
plum-lihui 已提交
145 146 147 148 149
        time.sleep(10)
        for i in range(len(topicNameList)):
            tdSql.query("drop topic %s"%topicNameList[i])

        tdLog.printNoPrefix("======== test case 1 end ...... ")
G
Ganlin Zhao 已提交
150

P
plum-lihui 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
    def tmqCase2(self):
        tdLog.printNoPrefix("======== test case 2: topic: select * from ntb, while consume, add column int-A/bianry-B/float-C, and then rename A, modify B, drop C")
        paraDict = {'dbName':     'db1',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':2}],
                    'tagSchema':   [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
                    'ctbPrefix':  'ctb',
                    'ctbStartIdx': 0,
                    'ctbNum':     1,
                    'rowsPerTbl': 10000,
                    'batchNum':   10,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'pollDelay':  10,
                    'showMsg':    1,
                    'showRow':    1}
G
Ganlin Zhao 已提交
171 172

        ntbName = 'ntb'
P
plum-lihui 已提交
173 174 175

        topicNameList = ['topic1']
        expectRowsList = []
G
Ganlin Zhao 已提交
176
        queryStringList = []
P
plum-lihui 已提交
177 178
        tmqCom.initConsumerTable()
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
179
        tdSql.execute("alter database %s wal_retention_period 3600" %(paraDict['dbName']))
P
plum-lihui 已提交
180 181 182 183 184 185 186 187
        tdLog.info("create stb")
        tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
        tdLog.info("create ntb")
        tdCom.create_table(tdSql, dbname=paraDict["dbName"],tbname=ntbName,column_elm_list=paraDict['colSchema'],count=1)
        tdLog.info("start insert data ....")
        # pThread = tmqCom.asyncInsertData(paraDict)
        tdCom.insert_rows(tdSql, dbname=paraDict["dbName"], tbname=ntbName, column_ele_list=paraDict['colSchema'], start_ts_value=paraDict["startTs"], count=paraDict["rowsPerTbl"])
        tdLog.info("insert data end")
G
Ganlin Zhao 已提交
188

P
plum-lihui 已提交
189 190 191 192 193
        tdLog.info("create topics from ntb with filter")
        queryStringList.append("select * from %s.%s" %(paraDict['dbName'], ntbName))
        sqlString = "create topic %s as %s" %(topicNameList[0], queryStringList[0])
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)
G
Ganlin Zhao 已提交
194 195 196
        tdSql.query(queryStringList[0])
        expectRowsList.append(tdSql.getRows())

P
plum-lihui 已提交
197 198 199 200 201 202 203 204 205 206 207
        # init consume info, and start tmq_sim, then check consume result
        tdLog.info("insert consume info to consume processor")
        consumerId   = 0
        expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
        topicList    = topicNameList[0]
        ifcheckdata  = 1
        ifManualCommit = 1
        keyList      = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        dstFile = tmqCom.getResultFileByTaosShell(consumerId, queryStringList[0])
G
Ganlin Zhao 已提交
208

P
plum-lihui 已提交
209 210 211 212 213
        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])

        tdLog.info("wait the notify info of start consume, then alter schema")
        tmqCom.getStartConsumeNotifyFromTmqsim()
G
Ganlin Zhao 已提交
214

P
plum-lihui 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228
        # add column double-A/bianry-B/double-C, and then rename A, modify B, drop C
        sqlString = "alter table %s.%s add column newc1 double"%(paraDict["dbName"],ntbName)
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s add column newc2 binary(16)"%(paraDict["dbName"],ntbName)
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s add column newc3 double"%(paraDict["dbName"],ntbName)
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s rename column newc1 newc1n"%(paraDict["dbName"],ntbName)
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s modify column newc2 binary(32)"%(paraDict["dbName"],ntbName)
        tdSql.execute(sqlString)
        sqlString = "alter table %s.%s drop column newc3"%(paraDict["dbName"],ntbName)
        tdSql.execute(sqlString)

G
Ganlin Zhao 已提交
229 230
        tdLog.info("check the consume result")
        tdSql.query(queryStringList[0])
P
plum-lihui 已提交
231 232 233 234
        expectRowsList.append(tdSql.getRows())

        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
G
Ganlin Zhao 已提交
235

P
plum-lihui 已提交
236 237
        tdLog.info("expect consume rows: %d"%(expectRowsList[0]))
        tdLog.info("act consume rows: %d"%(resultList[0]))
G
Ganlin Zhao 已提交
238

P
plum-lihui 已提交
239 240 241 242
        if expectRowsList[0] != resultList[0]:
            tdLog.exit("0 tmq consume rows error!")

        tmqCom.checkTmqConsumeFileContent(consumerId, dstFile)
G
Ganlin Zhao 已提交
243

P
plum-lihui 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
        time.sleep(10)
        for i in range(len(topicNameList)):
            tdSql.query("drop topic %s"%topicNameList[i])

        tdLog.printNoPrefix("======== test case 2 end ...... ")

    def run(self):
        self.tmqCase1()
        self.tmqCase2()


    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

event = threading.Event()

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())