diff --git a/tests/system-test/7-tmq/stbFilter.py b/tests/system-test/7-tmq/stbFilter.py index 7383d5b692d5cc07fda5c7cabc2b615a6c45fe8e..7ad3cc99e78c56803f76f864fa718a18f32547f0 100644 --- a/tests/system-test/7-tmq/stbFilter.py +++ b/tests/system-test/7-tmq/stbFilter.py @@ -20,18 +20,54 @@ class TDTestCase: tdSql.init(conn.cursor()) #tdSql.init(conn.cursor(), logSql) # output sql.txt file + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1} + + tmqCom.initConsumerTable() + tmqCom.create_database(tsql=tdSql, dbName=paraDict["dbName"],dropFlag=paraDict["dropFlag"], vgroups=paraDict['vgroups'],replica=paraDict['replica']) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + return + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") - paraDict = {'dbName': 'db1', + paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', 'vgroups': 4, + 'replica': 1, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, 'ctbNum': 10, 'rowsPerTbl': 10000, 'batchNum': 100, @@ -43,13 +79,6 @@ class TDTestCase: topicNameList = ['topic1', 'topic2', 'topic3'] expectRowsList = [] tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) - tdLog.info("create stb") - tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) - tdLog.info("create ctb") - tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) - tdLog.info("insert data") - tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) tdLog.info("create topics from stb with filter") queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 4 == 0" %(paraDict['dbName'], paraDict['stbName']) @@ -134,16 +163,18 @@ class TDTestCase: def tmqCase2(self): tdLog.printNoPrefix("======== test case 2: ") - paraDict = {'dbName': 'db2', + paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', 'vgroups': 4, + 'replica': 1, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, 'ctbNum': 10, 'rowsPerTbl': 10000, 'batchNum': 100, @@ -155,13 +186,6 @@ class TDTestCase: topicNameList = ['topic1', 'topic2', 'topic3'] expectRowsList = [] tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) - tdLog.info("create stb") - tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) - tdLog.info("create ctb") - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum']) - tdLog.info("insert data") - tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) tdLog.info("create topics from stb with filter") # sqlString = "create topic %s as select ts, sin(c1), pow(c2,3) from %s.%s where c2 >= 0" %(topicNameList[0], paraDict['dbName'], paraDict['stbName']) @@ -247,6 +271,7 @@ class TDTestCase: def run(self): tdSql.prepare() + self.prepareTestEnv() self.tmqCase1() self.tmqCase2() diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index f8488aee49cf1472f5a3196c2b256f84a5dc901c..607fc73ffff9a46eac6be0ac8edfa11eb29d90ed 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -170,33 +170,42 @@ class TMQCom: tdLog.debug("complete to create database %s"%(dbName)) return + # self.create_stable() and self.create_ctable() and self.insert_data_interlaceByMultiTbl() : The three functions are matched + # schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32)) def create_stable(self,tsql, dbName,stbName): - tsql.execute("create table if not exists %s.%s (ts timestamp, c1 int, c2 int, c3 binary(16)) tags(t1 int, t2 binary(32))"%(dbName, stbName)) + schemaString = "(ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))" + tsql.execute("create table if not exists %s.%s %s"%(dbName, stbName, schemaString)) tdLog.debug("complete to create %s.%s" %(dbName, stbName)) return def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0): - tsql.execute("use %s" %dbName) + # tsql.execute("use %s" %dbName) pre_create = "create table" sql = pre_create #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + batchNum = 10 + tblBatched = 0 for i in range(ctbNum): - tagValue = 'beijing' + tagBinaryValue = 'beijing' if (i % 2 == 0): - tagValue = 'shanghai' + tagBinaryValue = 'shanghai' elif (i % 3 == 0): - tagValue = 'changsha' + tagBinaryValue = 'changsha' - sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i+ctbStartIdx,stbName,i+ctbStartIdx+1, tagValue) - if (i > 0) and (i%100 == 0): + sql += " %s.%s%d using %s.%s tags(%d, %d, %d, '%s', '%s')"%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) + tblBatched += 1 + if (i == ctbNum-1 ) or (tblBatched == batchNum): tsql.execute(sql) + tblBatched = 0 sql = pre_create + if sql != pre_create: tsql.execute(sql) - tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) - return + tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName)) + return + # schema: (ts timestamp, c1 int, c2 binary(16)) def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None): tdLog.debug("start to insert data ............") tsql.execute("use %s" %dbName) @@ -208,11 +217,14 @@ class TMQCom: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): + rowsBatched = 0 sql += " %s%d values "%(stbName,i) for j in range(rowsPerTbl): sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) - if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + rowsBatched += 1 + if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)): tsql.execute(sql) + rowsBatched = 0 if j < rowsPerTbl - 1: sql = "insert into %s%d values " %(stbName,i) else: @@ -224,6 +236,7 @@ class TMQCom: tdLog.debug("insert data ............ [OK]") return + # schema: (ts timestamp, c1 int, c2 int, c3 binary(16)) def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs): tdLog.debug("start to insert data ............") tsql.execute("use %s" %dbName) @@ -234,14 +247,17 @@ class TMQCom: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): + rowsBatched = 0 sql += " %s%d values "%(ctbPrefix,i) for j in range(rowsPerTbl): if (j % 2 == 0): sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j) else: sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, -j, j) - if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + rowsBatched += 1 + if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)): tsql.execute(sql) + rowsBatched = 0 if j < rowsPerTbl - 1: sql = "insert into %s%d values " %(ctbPrefix,i) else: @@ -253,6 +269,7 @@ class TMQCom: tdLog.debug("insert data ............ [OK]") return + # schema: (ts timestamp, c1 int, c2 int, c3 binary(16), c4 timestamp) def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,ctbStartIdx=0): tdLog.debug("start to insert data ............") tsql.execute("use %s" %dbName) @@ -263,14 +280,17 @@ class TMQCom: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): + rowsBatched = 0 sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx) for j in range(rowsPerTbl): if (j % 2 == 0): sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j) else: sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j) - if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + rowsBatched += 1 + if (rowsBatched == batchNum) or (j == rowsPerTbl - 1): tsql.execute(sql) + rowsBatched = 0 if j < rowsPerTbl - 1: sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx) else: @@ -282,7 +302,8 @@ class TMQCom: tdLog.debug("insert data ............ [OK]") return - def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0): + # schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32)) + def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0): tdLog.debug("start to insert data ............") tsql.execute("use %s" %dbName) pre_insert = "insert into " @@ -297,15 +318,22 @@ class TMQCom: ctbDict[i] = 0 #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfCtb = 0 + rowsOfCtb = 0 while rowsOfCtb < rowsPerTbl: for i in range(ctbNum): - sql += " %s.%s_%d values "%(dbName,ctbPrefix,i) + sql += " %s.%s%d values "%(dbName,ctbPrefix,i+ctbStartIdx) + rowsBatched = 0 for k in range(batchNum): - sql += "(%d, %d, 'tmqrow_%d') "%(startTs + ctbDict[i], ctbDict[i], ctbDict[i]) + if (k % 2 == 0): + sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i], ctbDict[i],ctbDict[i], ctbDict[i],ctbDict[i],ctbDict[i]) + else: + sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i],-ctbDict[i],ctbDict[i],-ctbDict[i],ctbDict[i],ctbDict[i]) + + rowsBatched += 1 ctbDict[i] += 1 - if (0 == ctbDict[i]%batchNum) or (ctbDict[i] == rowsPerTbl): + if (rowsBatched == batchNum) or (ctbDict[i] == rowsPerTbl): tsql.execute(sql) + rowsBatched = 0 sql = "insert into " break rowsOfCtb = ctbDict[0] @@ -313,7 +341,18 @@ class TMQCom: tdLog.debug("insert data ............ [OK]") return - def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0): + def threadFunctionForInsertByInterlace(self, **paraDict): + # create new connector for new tdSql instance in my thread + newTdSql = tdCom.newTdSql() + self.insert_data_interlaceByMultiTbl(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"]) + return + + def asyncInsertDataByInterlace(self, paraDict): + pThread = threading.Thread(target=self.threadFunctionForInsertByInterlace, kwargs=paraDict) + pThread.start() + return pThread + + def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0): tdLog.debug("start to insert data wiht auto create child table ............") tsql.execute("use %s" %dbName) pre_insert = "insert into " @@ -324,17 +363,17 @@ class TMQCom: startTs = int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfSql = 0 + rowsBatched = 0 for i in range(ctbNum): - sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i) + sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i) for j in range(rowsPerTbl): sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) - rowsOfSql += 1 - if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)): + rowsBatched += 1 + if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)): tsql.execute(sql) - rowsOfSql = 0 + rowsBatched = 0 if j < rowsPerTbl - 1: - sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i,dbName,stbName,i) + sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i) else: sql = "insert into " #end sql diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb.py b/tests/system-test/7-tmq/tmqConsFromTsdb.py index 92f1da03298bede0f38687ee938bb6e0a42f6622..c18474dcc3c26d7a5f27ac6e255cccaa74175768 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb.py @@ -26,7 +26,7 @@ class TDTestCase: tdSql.init(conn.cursor(), False) def prepareTestEnv(self): - tdLog.printNoPrefix("======== test case 1: ") + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1.py b/tests/system-test/7-tmq/tmqConsFromTsdb1.py index d6ef441047a21b00508083a01eae399ba2a0fab5..af06c774d6adbf8fdf3ce2db95239d9c62858289 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1.py @@ -26,7 +26,7 @@ class TDTestCase: tdSql.init(conn.cursor(), False) def prepareTestEnv(self): - tdLog.printNoPrefix("======== test case 1: ") + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 4e76ee66f4274f55ee007ac40aa79e2a1f84fb14..15a6db21718bb92aefa491ff39aab753a656b6fb 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -154,3 +154,5 @@ python3 ./test.py -f 7-tmq/tmqUdf.py python3 ./test.py -f 7-tmq/tmqConsumerGroup.py python3 ./test.py -f 7-tmq/tmqShow.py python3 ./test.py -f 7-tmq/tmqAlterSchema.py +python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py +python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py