提交 e347528c 编写于 作者: P plum-lihui

test: add test case for tmq

上级 38d19e45
from ntpath import join
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *
from test import tdDnodes
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.dnodes = 5
self.mnodes = 3
self.idIndex = 0
self.roleIndex = 2
self.mnodeStatusIndex = 3
self.mnodeEpIndex = 1
self.dnodeStatusIndex = 4
self.mnodeCheckCnt = 10
self.host = socket.gethostname()
self.startPort = 6030
self.portStep = 100
self.dnodeOfLeader = 0
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkDnodesStatusAndCreateMnode(self,dnodeNumbers):
count=0
while count < dnodeNumbers:
tdSql.query("show dnodes")
# tdLog.debug(tdSql.queryResult)
dCnt = 0
for i in range(dnodeNumbers):
if tdSql.queryResult[i][self.dnodeStatusIndex] != "ready":
break
else:
dCnt += 1
if dCnt == dnodeNumbers:
break
time.sleep(1)
tdLog.debug("............... waiting for all dnodes ready!")
tdLog.info("==============create two new mnodes ========")
tdSql.execute("create mnode on dnode 2")
tdSql.execute("create mnode on dnode 3")
self.check3mnode()
return
def check3mnode(self):
count=0
while count < self.mnodeCheckCnt:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(self.mnodes) :
tdLog.debug("mnode is three nodes")
else:
tdLog.exit("mnode number is correct")
roleOfMnode0 = tdSql.queryResult[0][self.roleIndex]
roleOfMnode1 = tdSql.queryResult[1][self.roleIndex]
roleOfMnode2 = tdSql.queryResult[2][self.roleIndex]
if roleOfMnode0=='leader' and roleOfMnode1=='follower' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
else:
count+=1
else:
tdLog.exit("three mnodes is not ready in 10s ")
tdSql.query("show mnodes;")
tdSql.checkRows(self.mnodes)
tdSql.checkData(0,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort))
tdSql.checkData(0,self.mnodeStatusIndex,'ready')
tdSql.checkData(1,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep))
tdSql.checkData(1,self.mnodeStatusIndex,'ready')
tdSql.checkData(2,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep*2))
tdSql.checkData(2,self.mnodeStatusIndex,'ready')
def check3mnode1off(self):
count=0
while count < self.mnodeCheckCnt:
time.sleep(1)
tdSql.query("show mnodes")
tdLog.debug(tdSql.queryResult)
# if tdSql.checkRows(self.mnodes) :
# tdLog.debug("mnode is three nodes")
# else:
# tdLog.exit("mnode number is correct")
roleOfMnode0 = tdSql.queryResult[0][self.roleIndex]
roleOfMnode1 = tdSql.queryResult[1][self.roleIndex]
roleOfMnode2 = tdSql.queryResult[2][self.roleIndex]
if roleOfMnode0=='offline' :
if roleOfMnode1=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
elif roleOfMnode1=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
elif roleOfMnode1=='offline' :
if roleOfMnode0=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
elif roleOfMnode2=='offline' :
if roleOfMnode0=='leader' and roleOfMnode1 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
count+=1
else:
tdLog.exit("three mnodes is not ready in 10s ")
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}, {'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("async insert data")
pThread = tmqCom.asyncInsertData(paraDict)
tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the notify info of start consume")
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("start switch mnode ................")
tdDnodes = cluster.dnodes
tdLog.info("1. stop dnode 0")
tdDnodes[0].stoptaosd()
time.sleep(10)
self.check3mnode1off()
tdLog.info("2. start dnode 0")
tdDnodes[0].starttaosd()
self.check3mnode()
tdLog.info("3. stop dnode 1")
tdDnodes[1].stoptaosd()
time.sleep(10)
self.check3mnode1off()
tdLog.info("switch end and wait insert data end ................")
pThread.join()
tdLog.info("check the consume result")
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdLog.printNoPrefix("======== Notes: must add '-N 5' for run the script ========")
self.checkDnodesStatusAndCreateMnode(self.dnodes)
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -204,6 +204,35 @@ class TMQCom:
tdLog.debug("insert data ............ [OK]")
return
def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
else:
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
......@@ -291,6 +320,17 @@ class TMQCom:
pThread.start()
return pThread
def threadFunctionForInsert(self, **paraDict):
# create new connector for new tdSql instance in my thread
newTdSql = tdCom.newTdSql()
self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
return
def asyncInsertData(self, paraDict):
pThread = threading.Thread(target=self.threadFunctionForInsert, kwargs=paraDict)
pThread.start()
return pThread
def close(self):
self.cursor.close()
......
......@@ -113,7 +113,7 @@ class TDTestCase:
tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts, c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
......
......@@ -449,6 +449,15 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
#if 0
// get schema
//============================== stub =================================================//
for (int32_t i = 0; i < numOfFields; i++) {
taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
}
//============================== stub =================================================//
#endif
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
taos_print_row(buf, row, fields, numOfFields);
......@@ -651,12 +660,13 @@ void* consumeThreadFunc(void* param) {
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
exit(-1);
return NULL;
}
build_consumer(pInfo);
build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
assert(0);
return NULL;
}
......@@ -664,7 +674,9 @@ void* consumeThreadFunc(void* param) {
int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
taosFprintfFile(g_fp, "tmq_subscribe()! reason: %s\n", tmq_err2str(err));
assert(0);
return NULL;
}
tmq_list_destroy(pInfo->topicList);
......@@ -683,14 +695,13 @@ void* consumeThreadFunc(void* param) {
err = tmq_unsubscribe(pInfo->tmq);
if (err != 0) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
/*pInfo->consumeMsgCnt = -1;*/
/*return NULL;*/
taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
}
err = tmq_consumer_close(pInfo->tmq);
if (err != 0) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
/*exit(-1);*/
taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
}
pInfo->tmq = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册