未验证 提交 b04e7c93 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #14063 from taosdata/test3.0/lihui

test: add test case for tmq
class DataBoundary:
def __init__(self):
self.TINYINT_BOUNDARY = [-128, 127]
self.SMALLINT_BOUNDARY = [-32768, 32767]
self.INT_BOUNDARY = [-2147483648, 2147483647]
self.BIGINT_BOUNDARY = [-9223372036854775808, 9223372036854775807]
self.UTINYINT_BOUNDARY = [0, 255]
self.USMALLINT_BOUNDARY = [0, 65535]
self.UINT_BOUNDARY = [0, 4294967295]
self.UBIGINT_BOUNDARY = [0, 18446744073709551615]
self.FLOAT_BOUNDARY = [-3.40E+38, 3.40E+38]
self.DOUBLE_BOUNDARY = [-1.7e+308, 1.7e+308]
self.BOOL_BOUNDARY = [True, False]
self.BINARY_MAX_LENGTH = 16374
self.NCHAR_MAX_LENGTH = 4093
self.DBNAME_MAX_LENGTH = 64
self.STBNAME_MAX_LENGTH = 192
self.TBNAME_MAX_LENGTH = 192
self.CHILD_TBNAME_MAX_LENGTH = 192
self.TAG_KEY_MAX_LENGTH = 64
self.COL_KEY_MAX_LENGTH = 64
self.MAX_TAG_COUNT = 128
self.MAX_TAG_COL_COUNT = 4096
self.mnodeShmSize = [6292480, 2147483647]
self.mnodeShmSize_default = 6292480
self.vnodeShmSize = [6292480, 2147483647]
self.vnodeShmSize_default = 31458304
self.DB_PARAM_BUFFER_CONFIG = {"create_name": "buffer", "query_name": "buffer", "vnode_json_key": "szBuf", "boundary": [3, 16384], "default": 96}
self.DB_PARAM_CACHELAST_CONFIG = {"create_name": "cachelast", "query_name": "cache_model", "vnode_json_key": "", "boundary": [0, 1, 2, 3], "default": 0}
self.DB_PARAM_COMP_CONFIG = {"create_name": "comp", "query_name": "compression", "vnode_json_key": "", "boundary": [0, 1, 2], "default": 2}
self.DB_PARAM_DURATION_CONFIG = {"create_name": "duration", "query_name": "duration", "vnode_json_key": "daysPerFile", "boundary": [1, 3650, '60m', '5256000m', '1h', '87600h', '1d', '3650d'], "default": "14400m"}
self.DB_PARAM_FSYNC_CONFIG = {"create_name": "fsync", "query_name": "fsync", "vnode_json_key": "", "boundary": [0, 180000], "default": 3000}
self.DB_PARAM_KEEP_CONFIG = {"create_name": "keep", "query_name": "fsync", "vnode_json_key": "", "boundary": [1, 365000,'1440m','525600000m','24h','8760000h','1d','365000d'], "default": "5256000m,5256000m,5256000m"}
self.DB_PARAM_MAXROWS_CONFIG = {"create_name": "maxrows", "query_name": "maxrows", "vnode_json_key": "maxRows", "boundary": [200, 10000], "default": 4096}
self.DB_PARAM_MINROWS_CONFIG = {"create_name": "minrows", "query_name": "minrows", "vnode_json_key": "minRows", "boundary": [10, 1000], "default": 100}
self.DB_PARAM_NTABLES_CONFIG = {"create_name": "ntables", "query_name": "ntables", "vnode_json_key": "", "boundary": 0, "default": 0}
self.DB_PARAM_PAGES_CONFIG = {"create_name": "pages", "query_name": "pages", "vnode_json_key": "szCache", "boundary": [64], "default": 256}
self.DB_PARAM_PAGESIZE_CONFIG = {"create_name": "pagesize", "query_name": "pagesize", "vnode_json_key": "szPage", "boundary": [1, 16384], "default": 4}
self.DB_PARAM_PRECISION_CONFIG = {"create_name": "precision", "query_name": "precision", "vnode_json_key": "", "boundary": ['ms', 'us', 'ns'], "default": "ms"}
self.DB_PARAM_REPLICA_CONFIG = {"create_name": "replica", "query_name": "replica", "vnode_json_key": "", "boundary": [1], "default": 1}
self.DB_PARAM_SINGLE_STABLE_CONFIG = {"create_name": "single_stable", "query_name": "single_stable_model", "vnode_json_key": "", "boundary": [0, 1], "default": 0}
self.DB_PARAM_STRICT_CONFIG = {"create_name": "strict", "query_name": "strict", "vnode_json_key": "", "boundary": {"no_strict": 0, "strict": 1}, "default": "no_strict"}
self.DB_PARAM_VGROUPS_CONFIG = {"create_name": "vgroups", "query_name": "vgroups", "vnode_json_key": "", "boundary": [1, 32], "default": 2}
self.DB_PARAM_WAL_CONFIG = {"create_name": "wal", "query_name": "wal", "vnode_json_key": "", "boundary": [1, 2], "default": 1}
\ No newline at end of file
此差异已折叠。
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"])
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from db")
topicName = 'topic_%s_%s'%(paraDict['dbName'], paraDict['stbName'])
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s where c1 %% 4 == 0" %(topicName, paraDict['dbName'], paraDict['stbName']))
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicName
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt / 4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt / 4))
tdLog.exit("tmq consume rows error!")
time.sleep(10)
tdSql.query("drop topic %s"%topicName)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -92,13 +92,65 @@ class TMQCom:
tdLog.info(shellCmd)
os.system(shellCmd)
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
break
else:
time.sleep(0.1)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 :
print(tdSql.getData(0, 1), tdSql.getData(1, 1))
if tdSql.getData(1, 1) == 1:
break
time.sleep(0.1)
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def syncCreateDbStbCtbInsertData(self, tsql, paraDict):
tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"], paraDict['precision'])
tdCom.create_stable(tsql, paraDict["dbName"],paraDict["stbName"], paraDict["columnDict"], paraDict["tagDict"])
tdCom.create_ctables(tsql, paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["tagDict"])
tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"])
tdCom.create_stable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdCom.create_ctable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
if "event" in paraDict and type(paraDict['event']) == type(threading.Event()):
paraDict["event"].set()
tdCom.insert_data(tsql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
ctbPrefix = paraDict['ctbPrefix']
ctbNum = paraDict["ctbNum"]
for i in range(ctbNum):
tbName = '%s%s'%(ctbPrefix,i)
tdCom.insert_rows(tsql,dbname=paraDict["dbName"],tbname=tbName,start_ts_value=paraDict['startTs'],count=paraDict['rowsPerTbl'])
return
def threadFunction(self, **paraDict):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册