提交 b17fa86e 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/stream

...@@ -371,7 +371,7 @@ class TMQCom: ...@@ -371,7 +371,7 @@ class TMQCom:
elif (i % 3 == 0): elif (i % 3 == 0):
tagBinaryValue = 'changsha' tagBinaryValue = 'changsha'
sql += " %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) sql += " %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
for j in range(rowsPerTbl): for j in range(rowsPerTbl):
sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched) sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched)
rowsBatched += 1 rowsBatched += 1
...@@ -379,7 +379,7 @@ class TMQCom: ...@@ -379,7 +379,7 @@ class TMQCom:
tsql.execute(sql) tsql.execute(sql)
rowsBatched = 0 rowsBatched = 0
if j < rowsPerTbl - 1: if j < rowsPerTbl - 1:
sql = "insert into %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) sql = "insert into %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
else: else:
sql = "insert into " sql = "insert into "
#end sql #end sql
......
...@@ -17,7 +17,7 @@ from tmqCommon import * ...@@ -17,7 +17,7 @@ from tmqCommon import *
class TDTestCase: class TDTestCase:
def __init__(self): def __init__(self):
self.snapshot = 0 self.snapshot = 0
self.vgroups = 2 self.vgroups = 4
self.ctbNum = 1 self.ctbNum = 1
self.rowsPerTbl = 100000 self.rowsPerTbl = 100000
...@@ -235,18 +235,18 @@ class TDTestCase: ...@@ -235,18 +235,18 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
self.prepareTestEnv() # self.prepareTestEnv()
tdLog.printNoPrefix("=============================================") # tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") # tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1() # self.tmqCase1()
self.tmqCase2() # self.tmqCase2()
self.prepareTestEnv() self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1 self.snapshot = 1
self.tmqCase1() self.tmqCase1()
self.tmqCase2() # self.tmqCase2()
def stop(self): def stop(self):
......
...@@ -17,9 +17,10 @@ from tmqCommon import * ...@@ -17,9 +17,10 @@ from tmqCommon import *
class TDTestCase: class TDTestCase:
def __init__(self): def __init__(self):
self.snapshot = 0 self.snapshot = 0
self.vgroups = 2 self.vgroups = 4
self.ctbNum = 1 self.ctbNum = 100
self.rowsPerTbl = 100000 self.rowsPerTbl = 1000
self.autoCtbPrefix = 'aCtb'
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
...@@ -38,9 +39,9 @@ class TDTestCase: ...@@ -38,9 +39,9 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb', 'ctbPrefix': 'ctb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 1, 'ctbNum': 1000,
'rowsPerTbl': 100000, 'rowsPerTbl': 1000,
'batchNum': 1200, 'batchNum': 10000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3, 'pollDelay': 3,
'showMsg': 1, 'showMsg': 1,
...@@ -62,9 +63,9 @@ class TDTestCase: ...@@ -62,9 +63,9 @@ class TDTestCase:
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk") # tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName'])) # tdSql.query("flush database %s"%(paraDict['dbName']))
...@@ -84,9 +85,9 @@ class TDTestCase: ...@@ -84,9 +85,9 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb', 'ctbPrefix': 'ctb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 1, 'ctbNum': 1000,
'rowsPerTbl': 100000, 'rowsPerTbl': 1000,
'batchNum': 3000, 'batchNum': 10000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 5,
'showMsg': 1, 'showMsg': 1,
...@@ -98,10 +99,11 @@ class TDTestCase: ...@@ -98,10 +99,11 @@ class TDTestCase:
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables # update to half tables
paraDict['ctbNum'] = int(self.ctbNum/2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
...@@ -113,10 +115,14 @@ class TDTestCase: ...@@ -113,10 +115,14 @@ class TDTestCase:
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 3/2) if self.snapshot == 0:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2))
elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -129,7 +135,7 @@ class TDTestCase: ...@@ -129,7 +135,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result") tdLog.info("start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0 totalConsumeRows = 0
...@@ -143,7 +149,7 @@ class TDTestCase: ...@@ -143,7 +149,7 @@ class TDTestCase:
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
...@@ -161,9 +167,9 @@ class TDTestCase: ...@@ -161,9 +167,9 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb', 'ctbPrefix': 'ctb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 1, 'ctbNum': 1000,
'rowsPerTbl': 10000, 'rowsPerTbl': 1000,
'batchNum': 5000, 'batchNum': 10000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 5,
'showMsg': 1, 'showMsg': 1,
...@@ -179,14 +185,20 @@ class TDTestCase: ...@@ -179,14 +185,20 @@ class TDTestCase:
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables # update to half tables
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2) paraDict['ctbNum'] = int(self.ctbNum/2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby",
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
...@@ -197,9 +209,14 @@ class TDTestCase: ...@@ -197,9 +209,14 @@ class TDTestCase:
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum # paraDict['ctbNum'] = self.ctbNum
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1 consumerId = 1
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) if self.snapshot == 0:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2))
elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1
ifManualCommit = 1 ifManualCommit = 1
...@@ -212,7 +229,7 @@ class TDTestCase: ...@@ -212,7 +229,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result") tdLog.info("start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0 totalConsumeRows = 0
......
...@@ -178,6 +178,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py ...@@ -178,6 +178,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py #python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
#python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py
python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb.py
#------------querPolicy 2----------- #------------querPolicy 2-----------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册