TD-15517.py 15.3 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36

import taos
import sys
import time
import socket
import os
import threading

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *

class TDTestCase:
    hostname = socket.gethostname()
    #rpcDebugFlagVal = '143'
    #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
    #clientCfgDict["rpcDebugFlag"]  = rpcDebugFlagVal
    #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
    #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
    #print ("===================: ", updatecfgDict)

    def init(self, conn, logSql):
        tdLog.debug(f"start to excute {__file__}")
        #tdSql.init(conn.cursor())
        tdSql.init(conn.cursor(), logSql)  # output sql.txt file

    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))

        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("community")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

        for root, dirs, files in os.walk(projPath):
wafwerar's avatar
wafwerar 已提交
37
            if ("taosd" in files or "taosd.exe" in files):
P
plum-lihui 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
                    buildPath = root[:len(root) - len("/build/bin")]
                    break
        return buildPath

    def newcur(self,cfg,host,port):
        user = "root"
        password = "taosdata"
        con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
        cur=con.cursor()
        print(cur)
        return cur

    def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
G
Ganlin Zhao 已提交
53
        tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
P
plum-lihui 已提交
54 55 56 57 58 59 60 61 62 63 64 65
        tsql.execute("use %s" %dbName)
        tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
        pre_create = "create table"
        sql = pre_create
        #tdLog.debug("doing create one  stable %s and %d  child table in %s  ..." %(stbname, count ,dbname))
        for i in range(ctbNum):
            sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
            if (i > 0) and (i%100 == 0):
                tsql.execute(sql)
                sql = pre_create
        if sql != pre_create:
            tsql.execute(sql)
G
Ganlin Zhao 已提交
66

P
plum-lihui 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
        tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
        return

    def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
        tdLog.debug("start to insert data ............")
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
        for i in range(ctbNum):
            sql += " %s_%d values "%(stbName,i)
            for j in range(rowsPerTbl):
                sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
                if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
                    tsql.execute(sql)
                    if j < rowsPerTbl - 1:
                        sql = "insert into %s_%d values " %(stbName,i)
                    else:
                        sql = "insert into "
        #end sql
        if sql != pre_insert:
            #print("insert sql:%s"%sql)
            tsql.execute(sql)
        tdLog.debug("insert data ............ [OK]")
        return
G
Ganlin Zhao 已提交
93

P
plum-lihui 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
    def prepareEnv(self, **parameterDict):
        print ("input parameters:")
        print (parameterDict)
        # create new connector for my thread
        tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
        self.create_tables(tsql,\
                           parameterDict["dbName"],\
                           parameterDict["vgroups"],\
                           parameterDict["stbName"],\
                           parameterDict["ctbNum"],\
                           parameterDict["rowsPerTbl"])

        self.insert_data(tsql,\
                         parameterDict["dbName"],\
                         parameterDict["stbName"],\
                         parameterDict["ctbNum"],\
                         parameterDict["rowsPerTbl"],\
                         parameterDict["batchNum"],\
G
Ganlin Zhao 已提交
112 113 114
                         parameterDict["startTs"])
        return

P
plum-lihui 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    def run(self):
        tdSql.prepare()

        buildPath = self.getBuildPath()
        if (buildPath == ""):
            tdLog.exit("taosd not found!")
        else:
            tdLog.info("taosd found in %s" % buildPath)
        cfgPath = buildPath + "/../sim/psim/cfg"
        tdLog.info("cfgPath: %s" % cfgPath)

        tdLog.printNoPrefix("======== test scenario 1: ")
        tdLog.info("step 1: create database, stb, ctb and insert data")
        # create and start thread
        parameterDict = {'cfg':        '',       \
                         'dbName':     'db',     \
                         'vgroups':    1,        \
                         'stbName':    'stb',    \
                         'ctbNum':     10,       \
P
plum-lihui 已提交
134
                         'rowsPerTbl': 100,   \
P
plum-lihui 已提交
135 136 137 138 139 140
                         'batchNum':   10,       \
                         'startTs':    1640966400000}  # 2022-01-01 00:00:00.000
        parameterDict['cfg'] = cfgPath
        prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
        prepareEnvThread.start()
        time.sleep(2)
G
Ganlin Zhao 已提交
141

P
plum-lihui 已提交
142 143
        # wait stb ready
        while 1:
G
Ganlin Zhao 已提交
144 145
            tdSql.query("show %s.stables"%parameterDict['dbName'])
            if tdSql.getRows() == 1:
P
plum-lihui 已提交
146 147 148 149 150 151
                break
            else:
                time.sleep(1)

        tdLog.info("create topics from super table")
        topicFromStb = 'topic_stb_column'
G
Ganlin Zhao 已提交
152 153
        topicFromCtb = 'topic_ctb_column'

P
plum-lihui 已提交
154 155
        tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
        tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
G
Ganlin Zhao 已提交
156

P
plum-lihui 已提交
157 158 159 160 161 162 163
        time.sleep(1)
        tdSql.query("show topics")
        #tdSql.checkRows(2)
        topic1 = tdSql.getData(0 , 0)
        topic2 = tdSql.getData(1 , 0)
        print (topic1)
        print (topic2)
G
Ganlin Zhao 已提交
164

P
plum-lihui 已提交
165 166 167 168
        print (topicFromStb)
        print (topicFromCtb)
        #tdLog.info("show topics: %s, %s"%topic1, topic2)
        #if topic1 != topicFromStb or topic1 != topicFromCtb:
G
Ganlin Zhao 已提交
169
        #    tdLog.exit("topic error1")
P
plum-lihui 已提交
170
        #if topic2 != topicFromStb or topic2 != topicFromCtb:
G
Ganlin Zhao 已提交
171 172
        #    tdLog.exit("topic error2")

P
plum-lihui 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
        tdLog.info("create consume info table and consume result table")
        cdbName = parameterDict["dbName"]
        tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)")
        tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)")

        consumerId   = 0
        expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
        expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
        topicList    = topicFromStb
        ifcheckdata  = 0
        keyList      = 'group.id:cgrp1,\
                        enable.auto.commit:false,\
                        auto.commit.interval.ms:6000,\
                        auto.offset.reset:earliest'
        sql = "insert into consumeinfo values "
        sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
        tdSql.query(sql)
G
Ganlin Zhao 已提交
190

P
plum-lihui 已提交
191 192 193 194 195 196 197 198 199 200
        tdLog.info("check stb if there are data")
        while 1:
            tdSql.query("select count(*) from %s"%parameterDict["stbName"])
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
            countOfStb = tdSql.getData(0, 0)
            if countOfStb != 0:
                tdLog.info("count from stb: %d"%countOfStb)
                break
            else:
                time.sleep(1)
G
Ganlin Zhao 已提交
201

P
plum-lihui 已提交
202 203 204 205
        tdLog.info("start consume processor")
        pollDelay = 5
        showMsg   = 1
        showRow   = 1
G
Ganlin Zhao 已提交
206

P
plum-lihui 已提交
207
        shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
G
Ganlin Zhao 已提交
208 209
        shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
        shellCmd += "> /dev/null 2>&1 &"
P
plum-lihui 已提交
210
        tdLog.info(shellCmd)
G
Ganlin Zhao 已提交
211
        os.system(shellCmd)
P
plum-lihui 已提交
212 213 214

        # wait for data ready
        prepareEnvThread.join()
G
Ganlin Zhao 已提交
215

P
plum-lihui 已提交
216 217 218 219 220 221 222 223 224 225
        tdLog.info("insert process end, and start to check consume result")
        while 1:
            tdSql.query("select * from consumeresult")
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
            if tdSql.getRows() == 1:
                break
            else:
                time.sleep(5)

        expectrowcnt = parameterDict["rowsPerTbl"]  * parameterDict["ctbNum"]
G
Ganlin Zhao 已提交
226

P
plum-lihui 已提交
227 228 229 230 231 232
        tdSql.checkData(0 , 1, consumerId)
        tdSql.checkData(0 , 2, expectmsgcnt)
        tdSql.checkData(0 , 3, expectrowcnt)

        tdSql.query("drop topic %s"%topicFromStb)
        tdSql.query("drop topic %s"%topicFromCtb)
G
Ganlin Zhao 已提交
233

P
plum-lihui 已提交
234 235
        # ==============================================================================
        tdLog.printNoPrefix("======== test scenario 2: add child table with consuming ")
G
Ganlin Zhao 已提交
236
        tdLog.info(" clean database")
P
plum-lihui 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249
        # create and start thread
        parameterDict = {'cfg':        '',       \
                         'dbName':     'db2',    \
                         'vgroups':    1,        \
                         'stbName':    'stb',    \
                         'ctbNum':     10,       \
                         'rowsPerTbl': 10000,   \
                         'batchNum':   100,       \
                         'startTs':    1640966400000}  # 2022-01-01 00:00:00.000
        parameterDict['cfg'] = cfgPath

        prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
        prepareEnvThread.start()
G
Ganlin Zhao 已提交
250

P
plum-lihui 已提交
251 252 253
        # wait db ready
        while 1:
            tdSql.query("show databases")
G
Ganlin Zhao 已提交
254 255
            if tdSql.getRows() == 4:
                print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
P
plum-lihui 已提交
256 257 258
                break
            else:
                time.sleep(1)
G
Ganlin Zhao 已提交
259

P
plum-lihui 已提交
260 261 262 263
        tdSql.query("use %s"%parameterDict['dbName'])
        # wait stb ready
        while 1:
            tdSql.query("show %s.stables"%parameterDict['dbName'])
G
Ganlin Zhao 已提交
264
            if tdSql.getRows() == 1:
P
plum-lihui 已提交
265 266 267 268 269 270 271
                break
            else:
                time.sleep(1)

        tdLog.info("create topics from super table")
        topicFromStb = 'topic_stb_column2'
        topicFromCtb = 'topic_ctb_column2'
G
Ganlin Zhao 已提交
272

P
plum-lihui 已提交
273 274
        tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
        tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
G
Ganlin Zhao 已提交
275

P
plum-lihui 已提交
276 277 278 279 280 281
        time.sleep(1)
        tdSql.query("show topics")
        topic1 = tdSql.getData(0 , 0)
        topic2 = tdSql.getData(1 , 0)
        print (topic1)
        print (topic2)
G
Ganlin Zhao 已提交
282

P
plum-lihui 已提交
283 284 285 286
        print (topicFromStb)
        print (topicFromCtb)
        #tdLog.info("show topics: %s, %s"%topic1, topic2)
        #if topic1 != topicFromStb or topic1 != topicFromCtb:
G
Ganlin Zhao 已提交
287
        #    tdLog.exit("topic error1")
P
plum-lihui 已提交
288
        #if topic2 != topicFromStb or topic2 != topicFromCtb:
G
Ganlin Zhao 已提交
289 290
        #    tdLog.exit("topic error2")

P
plum-lihui 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
        tdLog.info("create consume info table and consume result table")
        cdbName = parameterDict["dbName"]
        tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
        tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)

        consumerId   = 0
        expectmsgcnt = (parameterDict["rowsPerTbl"] / parameterDict["batchNum"] ) * parameterDict["ctbNum"]
        expectmsgcnt1 = expectmsgcnt + parameterDict["ctbNum"]
        topicList    = topicFromStb
        ifcheckdata  = 0
        keyList      = 'group.id:cgrp1,\
                        enable.auto.commit:false,\
                        auto.commit.interval.ms:6000,\
                        auto.offset.reset:earliest'
        sql = "insert into consumeinfo values "
        sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectmsgcnt1, ifcheckdata)
        tdSql.query(sql)
G
Ganlin Zhao 已提交
308

P
plum-lihui 已提交
309 310 311 312 313 314 315 316 317 318
        tdLog.info("check stb if there are data")
        while 1:
            tdSql.query("select count(*) from %s"%parameterDict["stbName"])
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
            countOfStb = tdSql.getData(0, 0)
            if countOfStb != 0:
                tdLog.info("count from stb: %d"%countOfStb)
                break
            else:
                time.sleep(1)
G
Ganlin Zhao 已提交
319

P
plum-lihui 已提交
320 321 322 323
        tdLog.info("start consume processor")
        pollDelay = 5
        showMsg   = 1
        showRow   = 1
G
Ganlin Zhao 已提交
324

P
plum-lihui 已提交
325
        shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
G
Ganlin Zhao 已提交
326 327
        shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
        shellCmd += "> /dev/null 2>&1 &"
P
plum-lihui 已提交
328
        tdLog.info(shellCmd)
G
Ganlin Zhao 已提交
329
        os.system(shellCmd)
P
plum-lihui 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342

        # create new child table and insert data
        newCtbName = 'newctb'
        rowsOfNewCtb = 1000
        tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
        startTs = parameterDict["startTs"]
        for j in range(rowsOfNewCtb):
            sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j)
            tdSql.execute(sql)
        tdLog.debug("insert data into new child table ............ [OK]")

        # wait for data ready
        prepareEnvThread.join()
G
Ganlin Zhao 已提交
343

P
plum-lihui 已提交
344 345 346 347 348 349 350 351 352 353 354
        tdLog.info("insert process end, and start to check consume result")
        while 1:
            tdSql.query("select * from consumeresult")
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
            if tdSql.getRows() == 1:
                break
            else:
                time.sleep(5)

        expectmsgcnt += rowsOfNewCtb
        expectrowcnt = parameterDict["rowsPerTbl"]  * parameterDict["ctbNum"] + rowsOfNewCtb
G
Ganlin Zhao 已提交
355

P
plum-lihui 已提交
356 357 358 359
        tdSql.checkData(0 , 1, consumerId)
        tdSql.checkData(0 , 2, expectmsgcnt)
        tdSql.checkData(0 , 3, expectrowcnt)

G
Ganlin Zhao 已提交
360

P
plum-lihui 已提交
361 362 363 364 365 366 367 368 369 370 371 372
        # ==============================================================================
        tdLog.printNoPrefix("======== test scenario 3: ")

        #os.system('pkill tmq_sim')


    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())