未验证 提交 2cf7763b 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21046 from taosdata/fix/TD-22671

opti:test cases for tmq
......@@ -94,26 +94,26 @@ class ClusterComCreate:
tdLog.info(shellCmd)
os.system(shellCmd)
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
break
else:
time.sleep(0.1)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 :
print(tdSql.getData(0, 1), tdSql.getData(1, 1))
if tdSql.getData(1, 1) == 1:
break
time.sleep(0.1)
return
# def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
# while 1:
# tdSql.query("select * from %s.notifyinfo"%cdbName)
# #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
# if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
# break
# else:
# time.sleep(0.1)
# return
#
# def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
# while 1:
# tdSql.query("select * from %s.notifyinfo"%cdbName)
# #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
# if tdSql.getRows() == 2 :
# print(tdSql.getData(0, 1), tdSql.getData(1, 1))
# if tdSql.getData(1, 1) == 1:
# break
# time.sleep(0.1)
# return
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
if dropFlag == 1:
......
......@@ -10,6 +10,8 @@ from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
hostname = socket.gethostname()
......@@ -67,26 +69,26 @@ class TDTestCase:
tdLog.info("consume info sql: %s"%sql)
tdSql.query(sql)
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
break
else:
time.sleep(0.1)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 :
tdLog.info("row[0][1]: %d, row[1][1]: %d"%(tdSql.getData(0, 1), tdSql.getData(1, 1)))
if tdSql.getData(1, 1) == 1:
break
time.sleep(0.1)
return
# def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
# while 1:
# tdSql.query("select * from %s.notifyinfo"%cdbName)
# #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
# if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
# break
# else:
# time.sleep(0.1)
# return
#
# def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
# while 1:
# tdSql.query("select * from %s.notifyinfo"%cdbName)
# #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
# if tdSql.getRows() == 2 :
# tdLog.info("row[0][1]: %d, row[1][1]: %d"%(tdSql.getData(0, 1), tdSql.getData(1, 1)))
# if tdSql.getData(1, 1) == 1:
# break
# time.sleep(0.1)
# return
def selectConsumeResult(self,expectRows,cdbName='cdb'):
resultList=[]
......@@ -233,7 +235,7 @@ class TDTestCase:
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("wait the notify info of start consume")
self.getStartConsumeNotifyFromTmqsim()
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("pkill consume processor")
if (platform.system().lower() == 'windows'):
......@@ -311,7 +313,7 @@ class TDTestCase:
# time.sleep(6)
tdLog.info("start to wait commit notify")
self.getStartCommitNotifyFromTmqsim()
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("pkill consume processor")
if (platform.system().lower() == 'windows'):
......
......@@ -145,13 +145,12 @@ class TMQCom:
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
tdLog.debug("%s is stopped by kill -INT" % (processorName))
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb',rows=1):
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
loopFlag = 1
while loopFlag:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
actRows = tdSql.getRows()
if (actRows >= rows):
for i in range(actRows):
if tdSql.getData(i, 1) == 0:
loopFlag = 0
......@@ -159,18 +158,17 @@ class TMQCom:
time.sleep(0.02)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb',rows=2):
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
loopFlag = 1
while loopFlag:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
actRows = tdSql.getRows()
if (actRows >= rows):
for i in range(actRows):
if tdSql.getData(i, 1) == 1:
loopFlag = 0
break
time.sleep(0.10)
time.sleep(0.02)
return
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
......
......@@ -100,7 +100,7 @@ class TDTestCase:
tdLog.info("wait consumer commit notify")
# tmqCom.getStartCommitNotifyFromTmqsim(rows=4)
tmqCom.getStartConsumeNotifyFromTmqsim(rows=2)
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("pkill one consume processor")
tmqCom.stopTmqSimProcess('tmq_sim_new')
......
......@@ -121,7 +121,7 @@ class TDTestCase:
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# time.sleep(3)
tmqCom.getStartCommitNotifyFromTmqsim('cdb',1)
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("create some new child table and insert data for latest mode")
paraDict["batchNum"] = 100
......@@ -205,7 +205,7 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartCommitNotifyFromTmqsim('cdb',1)
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("create some new child table and insert data for latest mode")
paraDict["batchNum"] = 10
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册