提交 1b94544b 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/tsim

......@@ -24,7 +24,7 @@ class ClusterDnodes(TDDnodes):
class ConfigureyCluster:
"""This will create defined number of dnodes and create a cluset.
"""This will create defined number of dnodes and create a cluster.
at the same time, it will return TDDnodes list: dnodes, """
hostname= socket.gethostname()
......@@ -85,8 +85,8 @@ class ConfigureyCluster:
count+=1
time.sleep(1)
else:
tdLog.debug("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodeNums)
return -1
tdLog.exit("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodeNums)
cluster = ConfigureyCluster()
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
class TDTestCase:
updatecfgDict = {'ttlUnit':5,'ttlPushInterval':3}
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.ntbname = 'ntb'
self.stbname = 'stb'
self.tbnum = 10
self.ttl_param = 1
self.default_ttl = 100
self.modify_ttl = 1
def ttl_check_ntb(self):
tdSql.prepare()
for i in range(self.tbnum):
tdSql.execute(f'create table {self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}')
tdSql.query(f'show tables')
tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables')
tdSql.checkRows(0)
for i in range(self.tbnum):
tdSql.execute(f'create table {self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}')
for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table {self.ntbname}_{i} ttl {self.modify_ttl}')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2))
tdSql.execute('drop database db')
def ttl_check_ctb(self):
tdSql.prepare()
tdSql.execute(f'create table {self.stbname} (ts timestamp,c0 int) tags(t0 int)')
for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({i}) ttl {self.ttl_param}')
tdSql.query(f'show tables')
tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables')
tdSql.checkRows(0)
for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({i}) ttl {self.default_ttl}')
tdSql.query(f'show tables')
tdSql.checkRows(self.tbnum)
for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table {self.stbname}_{i} ttl {self.modify_ttl}')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2))
tdSql.execute('drop database db')
def run(self):
self.ttl_check_ntb()
self.ttl_check_ctb()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -17,100 +17,140 @@ from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.dbname = 'db_test'
self.setsql = TDSetSql()
self.ntbname = 'ntb'
self.rowNum = 10
self.tbnum = 20
self.ts = 1537146000000
self.binary_str = 'taosdata'
self.nchar_str = '涛思数据'
def bottom_check_base(self):
tdSql.prepare()
tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 tinyint unsigned, col6 smallint unsigned,
col7 int unsigned, col8 bigint unsigned, col9 float, col10 double, col11 bool, col12 binary(20), col13 nchar(20)) tags(loc nchar(20))''')
tdSql.execute("create table stb_1 using stb tags('beijing')")
column_list = ['col1','col2','col3','col4','col5','col6','col7','col8']
error_column_list = ['col11','col12','col13']
error_param_list = [0,101]
for i in range(self.rowNum):
tdSql.execute(f"insert into stb_1 values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1))
self.column_dict = {
'ts' : 'timestamp',
'col1': 'tinyint',
'col2': 'smallint',
'col3': 'int',
'col4': 'bigint',
'col5': 'tinyint unsigned',
'col6': 'smallint unsigned',
'col7': 'int unsigned',
'col8': 'bigint unsigned',
'col9': 'float',
'col10': 'double',
'col11': 'bool',
'col12': 'binary(20)',
'col13': 'nchar(20)'
}
for i in column_list:
tdSql.query(f'select bottom({i},2) from stb_1')
tdSql.checkRows(2)
tdSql.checkEqual(tdSql.queryResult,[(2,),(1,)])
for j in error_param_list:
tdSql.error(f'select bottom({i},{j}) from stb_1')
for i in error_column_list:
tdSql.error(f'select bottom({i},10) from stb_1')
tdSql.query("select ts,bottom(col1, 2),ts from stb_1 group by tbname")
tdSql.checkRows(2)
tdSql.query('select bottom(col2,1) from stb_1 interval(1y) order by col2')
tdSql.checkData(0,0,1)
tdSql.error('select * from stb_1 where bottom(col2,1)=1')
tdSql.execute('drop database db')
def bottom_check_distribute(self):
# prepare data for vgroup 4
dbname = tdCom.getLongName(5, "letters")
self.param_list = [1,100]
def insert_data(self,column_dict,tbname,row_num):
sql = ''
for k, v in column_dict.items():
if v.lower() == 'timestamp' or v.lower() == 'tinyint' or v.lower() == 'smallint' or v.lower() == 'int' or v.lower() == 'bigint' or \
v.lower() == 'tinyint unsigned' or v.lower() == 'smallint unsigned' or v.lower() == 'int unsigned' or v.lower() == 'bigint unsigned' or v.lower() == 'bool':
sql += '%d,'
elif v.lower() == 'float' or v.lower() == 'double':
sql += '%f,'
elif 'binary' in v.lower():
sql += f'"{self.binary_str}%d",'
elif 'nchar' in v.lower():
sql += f'"{self.nchar_str}%d",'
insert_sql = f'insert into {tbname} values({sql[:-1]})'
for i in range(row_num):
insert_list = []
for k, v in column_dict.items():
if v.lower() in[ 'tinyint' , 'smallint' , 'int', 'bigint' , 'tinyint unsigned' , 'smallint unsigned' , 'int unsigned' , 'bigint unsigned'] or\
'binary' in v.lower() or 'nchar' in v.lower():
insert_list.append(0 + i)
elif v.lower() == 'float' or v.lower() == 'double':
insert_list.append(0.1 + i)
elif v.lower() == 'bool':
insert_list.append(i % 2)
elif v.lower() == 'timestamp':
insert_list.append(self.ts + i)
tdSql.execute(insert_sql%(tuple(insert_list)))
def bottom_check_data(self,tbname,tb_type):
new_column_dict = {}
for param in self.param_list:
for k,v in self.column_dict.items():
if v.lower() in ['tinyint','smallint','int','bigint','tinyint unsigned','smallint unsigned','int unsigned','bigint unsigned']:
tdSql.query(f'select bottom({k},{param}) from {tbname} order by {k}')
if param >= self.rowNum:
if tb_type in ['normal_table','child_table']:
tdSql.checkRows(self.rowNum)
values_list = []
for i in range(self.rowNum):
tp = (i,)
values_list.append(tp)
tdSql.checkEqual(tdSql.queryResult,values_list)
elif tb_type == 'stable':
tdSql.checkRows(param)
elif param < self.rowNum:
if tb_type in ['normal_table','child_table']:
tdSql.checkRows(param)
values_list = []
for i in range(param):
tp = (i,)
values_list.append(tp)
tdSql.checkEqual(tdSql.queryResult,values_list)
elif tb_type == 'stable':
tdSql.checkRows(param)
for i in [self.param_list[0]-1,self.param_list[-1]+1]:
tdSql.error(f'select top({k},{i}) from {tbname}')
new_column_dict.update({k:v})
elif v.lower() == 'bool' or 'binary' in v.lower() or 'nchar' in v.lower():
tdSql.error(f'select top({k},{param}) from {tbname}')
tdSql.error(f'select * from {tbname} where top({k},{param})=1')
pass
def bottom_check_ntb(self):
tdSql.execute(f'create database if not exists {self.dbname} vgroups 1')
tdSql.execute(f'use {self.dbname}')
tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict))
self.insert_data(self.column_dict,self.ntbname,self.rowNum)
self.bottom_check_data(self.ntbname,'normal_table')
tdSql.execute(f'drop database {self.dbname}')
def bottom_check_stb(self):
stbname = tdCom.getLongName(5, "letters")
vgroup_num = 2
child_table_num = 20
tdSql.execute(f"create database if not exists {dbname} vgroups {vgroup_num}")
tdSql.execute(f'use {dbname}')
# build 20 child tables,every table insert 10 rows
tdSql.execute(f'''create table {stbname}(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 tinyint unsigned, col6 smallint unsigned,
col7 int unsigned, col8 bigint unsigned, col9 float, col10 double, col11 bool, col12 binary(20), col13 nchar(20)) tags(loc nchar(20))''')
for i in range(child_table_num):
tdSql.execute(f"create table {stbname}_{i} using {stbname} tags('beijing')")
tdSql.execute(f"insert into {stbname}_{i}(ts) values(%d)" % (self.ts - 1-i))
column_list = ['col1','col2','col3','col4','col5','col6','col7','col8']
error_column_list = ['col11','col12','col13']
error_param_list = [0,101]
for i in [f'{stbname}', f'{dbname}.{stbname}']:
for j in column_list:
tdSql.query(f"select bottom({j},1) from {i}")
tdSql.checkRows(0)
tag_dict = {
't0':'int'
}
tag_values = [
f'1'
]
tdSql.execute(f"create database if not exists {self.dbname} vgroups 2")
tdSql.execute(f'use {self.dbname}')
tdSql.execute(self.setsql.set_create_stable_sql(stbname,self.column_dict,tag_dict))
for i in range(self.tbnum):
tdSql.execute(f"create table {stbname}_{i} using {stbname} tags({tag_values[0]})")
tdSql.execute(self.insert_data(self.column_dict,f'{stbname}_{i}',self.rowNum))
tdSql.query('show tables')
vgroup_list = []
for i in range(len(tdSql.queryResult)):
vgroup_list.append(tdSql.queryResult[i][6])
vgroup_list_set = set(vgroup_list)
for i in vgroup_list_set:
vgroups_num = vgroup_list.count(i)
if vgroups_num >=2:
if vgroups_num >= 2:
tdLog.info(f'This scene with {vgroups_num} vgroups is ok!')
continue
else:
tdLog.exit(f'This scene does not meet the requirements with {vgroups_num} vgroup!\n')
for i in range(self.rowNum):
for j in range(child_table_num):
tdSql.execute(f"insert into {stbname}_{j} values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1))
for i in column_list:
tdSql.query(f'select bottom({i},2) from {stbname}')
tdSql.checkRows(2)
tdSql.checkEqual(tdSql.queryResult,[(1,),(1,)])
for j in error_param_list:
tdSql.error(f'select bottom({i},{j}) from {stbname}')
for i in error_column_list:
tdSql.error(f'select bottom({i},10) from {stbname}')
tdSql.execute(f'drop database {dbname}')
tdLog.exit(
'This scene does not meet the requirements with {vgroups_num} vgroup!\n')
for i in range(self.tbnum):
self.bottom_check_data(f'{stbname}_{i}','child_table')
self.bottom_check_data(f'{stbname}','stable')
tdSql.execute(f'drop database {self.dbname}')
def run(self):
self.bottom_check_base()
self.bottom_check_distribute()
self.bottom_check_ntb()
self.bottom_check_stb()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
......@@ -11,12 +11,14 @@
# -*- coding: utf-8 -*-
import imp
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
import json
import os
class TDTestCase:
......@@ -29,6 +31,9 @@ class TDTestCase:
return
def init(self, conn, logSql):
self.testcasePath = os.path.split(__file__)[0]
self.testcaseFilename = os.path.split(__file__)[-1]
os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
......@@ -557,6 +562,103 @@ class TDTestCase:
tdSql.checkRows(3)
tdSql.query("select round(dataint) from jsons1 where jtag->'tag1'>1")
tdSql.checkRows(3)
#math function
tdSql.query("select sin(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select cos(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select tan(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select asin(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select acos(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select atan(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select ceil(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select floor(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select round(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select abs(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select pow(dataint,5) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select log(dataint,10) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select sqrt(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select HISTOGRAM(dataint,'user_input','[1, 33, 555, 7777]',1) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select csum(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select mavg(dataint,1) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select statecount(dataint,'GE',10) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select stateduration(dataint,'GE',0) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select sample(dataint,3) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select HYPERLOGLOG(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(1)
tdSql.query("select twa(dataint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(1)
# function not ready
# tdSql.query("select tail(dataint,1) from jsons1 where jtag->'tag1'>1;")
# tdSql.checkRows(3)
# tdSql.query("select unique(dataint) from jsons1 where jtag->'tag1'>1;")
# tdSql.checkRows(3)
# tdSql.query("select mode(dataint) from jsons1 where jtag->'tag1'>1;")
# tdSql.checkRows(3)
# tdSql.query("select irate(dataint) from jsons1 where jtag->'tag1'>1;")
# tdSql.checkRows(1)
#str function
tdSql.query("select upper(dataStr) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select ltrim(dataStr) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select lower(dataStr) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select rtrim(dataStr) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select LENGTH(dataStr) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select CHAR_LENGTH(dataStr) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select SUBSTR(dataStr,5) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select CONCAT(dataStr,dataStrBin) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select CONCAT_ws('adad!@!@%$^$%$^$%^a',dataStr,dataStrBin) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select CAST(dataStr as bigint) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
#time function
tdSql.query("select now() from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select today() from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TIMEZONE() from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TO_ISO8601(ts) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TO_UNIXTIMESTAMP(datastr) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TIMETRUNCATE(ts,1u) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TIMEDIFF(ts,_c0) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select TIMEDIFF(ts,1u) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(3)
tdSql.query("select ELAPSED(ts,1h) from jsons1 where jtag->'tag1'>1;")
tdSql.checkRows(1)
#
# #test TD-12077
tdSql.execute("insert into jsons1_16 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":-2.111}') values(1591062628000, 2, NULL, '你就会', 'dws')")
......
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
import time
import socket
import subprocess
from multiprocessing import Process
import threading
import time
import inspect
import ctypes
class TDTestCase:
def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
tdSql.init(conn.cursor())
self.host = socket.gethostname()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def _async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stopThread(self,thread):
self._async_raise(thread.ident, SystemExit)
def insertData(self,countstart,countstop):
# fisrt add data : db\stable\childtable\general table
for couti in range(countstart,countstop):
tdLog.debug("drop database if exists db%d" %couti)
tdSql.execute("drop database if exists db%d" %couti)
print("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("use db%d" %couti)
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
dbNumbers = int(dnodenumbers * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2")
clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# add some error operations and
tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;")
print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
tdLog.info("Take turns stopping all dnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
tdDnodes=cluster.dnodes
stopcount =0
while stopcount < restartNumber:
for i in range(dnodenumbers):
# threads=[]
# threads = MyThreadFunc(self.insert_data(i*2,i*2+2))
paraDict["dbName"]= 'db%d%d'%(stopcount,i)
threads=threading.Thread(target=clusterComCreate.create_database, args=(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))
threads.start()
tdDnodes[i].stoptaosd()
# sleep(10)
tdDnodes[i].starttaosd()
# sleep(10)
if clusterComCheck.checkDnodes(dnodenumbers):
# threads.join()
tdLog.info("first restart loop")
else:
print("456")
threads.join()
self.stopThread(threads)
tdLog.exit("one or more of dnodes failed to start ")
# self.check3mnode()
stopcount+=1
threads.join()
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkDbRows(dbNumbers)
for i in range(restartNumber):
clusterComCheck.checkDb(dnodenumbers,'db%d'%i)
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
import time
import socket
import subprocess
from multiprocessing import Process
import threading
import time
import inspect
import ctypes
class TDTestCase:
def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
tdSql.init(conn.cursor())
self.host = socket.gethostname()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def _async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stopThread(self,thread):
self._async_raise(thread.ident, SystemExit)
def insertData(self,countstart,countstop):
# fisrt add data : db\stable\childtable\general table
for couti in range(countstart,countstop):
tdLog.debug("drop database if exists db%d" %couti)
tdSql.execute("drop database if exists db%d" %couti)
print("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("use db%d" %couti)
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
dbNumbers = int(mnodeNums * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2")
clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# add some error operations and
tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;")
print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
tdLog.info("Take turns stopping Mnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
tdDnodes=cluster.dnodes
stopcount =0
while stopcount < restartNumber:
tdLog.info("first restart loop")
for i in range(mnodeNums):
# threads=[]
# threads = MyThreadFunc(self.insert_data(i*2,i*2+2))
paraDict["dbName"]= 'db%d%d'%(stopcount,i)
threads=threading.Thread(target=clusterComCreate.create_database, args=(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))
threads.start()
tdDnodes[i].stoptaosd()
# sleep(10)
tdDnodes[i].starttaosd()
# sleep(10)
if clusterComCheck.checkDnodes(dnodenumbers):
# threads.join()
tdLog.info("123")
else:
print("456")
threads.join()
self.stopThread(threads)
tdLog.exit("one or more of dnodes failed to start ")
# self.check3mnode()
stopcount+=1
threads.join()
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkDbRows(dbNumbers)
for i in range(restartNumber):
clusterComCheck.checkDb(mnodeNums,'db%d'%i)
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
import time
import socket
import subprocess
from multiprocessing import Process
import threading
import time
import inspect
import ctypes
class TDTestCase:
def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
tdSql.init(conn.cursor())
self.host = socket.gethostname()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def _async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stopThread(self,thread):
self._async_raise(thread.ident, SystemExit)
def insertData(self,countstart,countstop):
# fisrt add data : db\stable\childtable\general table
for couti in range(countstart,countstop):
tdLog.debug("drop database if exists db%d" %couti)
tdSql.execute("drop database if exists db%d" %couti)
print("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("use db%d" %couti)
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
vnodeNumbers = int(dnodenumbers-mnodeNums)
dbNumbers = int(vnodeNumbers * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2")
clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# add some error operations and
tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;")
print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
tdLog.info("Take turns stopping Vnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
tdDnodes=cluster.dnodes
stopcount =0
while stopcount < restartNumber:
for i in range(vnodeNumbers):
# threads=[]
# threads = MyThreadFunc(self.insert_data(i*2,i*2+2))
paraDict["dbName"]= 'db%d%d'%(stopcount,i)
threads=threading.Thread(target=clusterComCreate.create_database, args=(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))
threads.start()
tdDnodes[mnodeNums+i].stoptaosd()
# sleep(10)
tdDnodes[mnodeNums+i].starttaosd()
# sleep(10)
if clusterComCheck.checkDnodes(vnodeNumbers):
# threads.join()
tdLog.info("first restart loop")
else:
print("456")
threads.join()
self.stopThread(threads)
tdLog.exit("one or more of dnodes failed to start ")
# self.check3mnode()
stopcount+=1
threads.join()
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkDbRows(dbNumbers)
for i in range(restartNumber):
clusterComCheck.checkDb(vnodeNumbers,'db%d'%i)
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
......@@ -9,6 +9,11 @@ from util.sql import *
from util.cases import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
import time
import socket
import subprocess
......@@ -17,26 +22,13 @@ import threading
import time
import inspect
import ctypes
class MyDnodes(TDDnodes):
def __init__(self ,dnodes_lists):
super(MyDnodes,self).__init__()
self.dnodes = dnodes_lists # dnode must be TDDnode instance
self.simDeployed = False
class TDTestCase:
def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
def buildcluster(self,dnodenumber):
self.depoly_cluster(dnodenumber)
self.master_dnode = self.TDDnodes.dnodes[0]
self.host=self.master_dnode.cfgDict["fqdn"]
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
tdSql.init(conn1.cursor())
# tdSql.init(conn.cursor())
# self.host = socket.gethostname()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
......@@ -106,52 +98,6 @@ class TDTestCase:
tdSql.checkData(0,0,rowsPerSTable)
return
def depoly_cluster(self ,dnodes_nums=5,independent=True):
testCluster = False
valgrind = 0
hostname = socket.gethostname()
dnodes = []
start_port = 6030
start_port_sec = 6130
for num in range(1, dnodes_nums+1):
dnode = TDDnode(num)
dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}")
dnode.addExtraCfg("fqdn", f"{hostname}")
dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}")
dnode.addExtraCfg("monitorFqdn", hostname)
dnode.addExtraCfg("monitorPort", 7043)
dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}")
# configure three dnoe don't support vnodes
if independent and (num < 4):
dnode.addExtraCfg("supportVnodes", 0)
dnodes.append(dnode)
self.TDDnodes = MyDnodes(dnodes)
self.TDDnodes.init("")
self.TDDnodes.setTestCluster(testCluster)
self.TDDnodes.setValgrind(valgrind)
self.TDDnodes.stopAll()
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.deploy(dnode.index,{})
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.starttaosd(dnode.index)
# create cluster
for dnode in self.TDDnodes.dnodes[1:]:
# print(dnode.cfgDict)
dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0]
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;"
print(cmd)
os.system(cmd)
time.sleep(2)
tdLog.info(" create cluster with %d dnode done! " %dnodes_nums)
def checkdnodes(self,dnodenumber):
count=0
while count < 100:
......@@ -305,6 +251,14 @@ class TDTestCase:
tdSql.checkData(2,2,'offline')
tdSql.checkData(2,3,'ready')
def check5dnode(self):
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
tdSql.checkData(0,4,'ready')
tdSql.checkData(4,4,'ready')
def five_dnode_three_mnode(self,dnodenumber):
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
......@@ -346,6 +300,7 @@ class TDTestCase:
threads.join()
else:
print("456")
threads.join()
self.stop_thread(threads)
assert 1 == 2 ,"some dnode started failed"
return False
......@@ -357,16 +312,8 @@ class TDTestCase:
self.check3mnode()
def getConnection(self, dnode):
host = dnode.cfgDict["fqdn"]
port = dnode.cfgDict["serverPort"]
config_dir = dnode.cfgDir
return taos.connect(host=host, port=int(port), config=config_dir)
def run(self):
# print(self.master_dnode.cfgDict)
self.buildcluster(5)
self.five_dnode_three_mnode(5)
def stop(self):
......
......@@ -12,7 +12,10 @@ from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
from test import tdDnodes
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import *
import time
import socket
import subprocess
......@@ -216,64 +219,88 @@ class TDTestCase:
else:
tdLog.exit("create cluster with %d dnode but check dnode not ready within 5s ! "%dnodeNumbers)
def five_dnode_three_mnode(self,dnodenumber):
self.check_dnodes_status(5)
tdSql.query("show mnodes;")
tdLog.debug(self.host)
tdSql.checkRows(1)
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
dbNumbers = int(dnodenumbers * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(0,2,'leader')
tdSql.checkData(0,3,'ready')
tdSql.checkData(4,1,'%s:6430'%self.host)
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2")
time.sleep(10)
clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# fisrt check statut ready
self.check3mnode()
# add some error operations and
tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;")
# tdLog.debug(tdSql.queryResult)
tdLog.debug("stop and follower of mnode")
print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
# restart all taosd
tdDnodes=cluster.dnodes
# tdLog.debug(tdDnodes[0])
tdDnodes[1].stoptaosd()
self.check3mnode2off()
clusterComCheck.check3mnodeoff(2,3)
tdDnodes[1].starttaosd()
self.check3mnode()
clusterComCheck.checkMnodeStatus(3)
tdDnodes[2].stoptaosd()
self.check3mnode3off()
clusterComCheck.check3mnodeoff(3,3)
tdDnodes[2].starttaosd()
self.check3mnode()
clusterComCheck.checkMnodeStatus(3)
tdDnodes[0].stoptaosd()
self.check3mnode1off()
clusterComCheck.check3mnodeoff(1,3)
tdDnodes[0].starttaosd()
self.check3mnode()
clusterComCheck.checkMnodeStatus(3)
self.check3mnode()
tdLog.info("Take turns stopping all dnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
stopcount =0
while stopcount <= 2:
for i in range(dnodenumber):
tdLog.info("first restart loop")
for i in range(dnodenumbers):
tdDnodes[i].stoptaosd()
tdDnodes[i].starttaosd()
# self.check3mnode()
stopcount+=1
self.check3mnode()
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(3)
def run(self):
self.five_dnode_three_mnode(5)
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
tdCases.addWindows(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from collections import defaultdict
import random
import string
import threading
import requests
import time
# import socketfrom
import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# class actionType(Enum):
# CREATE_DATABASE = 0
# CREATE_STABLE = 1
# CREATE_CTABLE = 2
# INSERT_DATA = 3
class ClusterComCheck:
def init(self, conn, logSql):
tdSql.init(conn.cursor())
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkDnodes(self,dnodeNumbers):
count=0
while count < 5:
tdSql.query("show dnodes")
# tdLog.debug(tdSql.queryResult)
status=0
for i in range(dnodeNumbers):
if tdSql.queryResult[i][4] == "ready":
status+=1
tdLog.info(status)
if status == dnodeNumbers:
tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within 5s! " %dnodeNumbers)
return True
count+=1
time.sleep(1)
else:
tdLog.debug(tdSql.queryResult)
tdLog.exit("it find cluster with %d dnodes but check that there dnodes are not ready within 5s ! "%dnodeNumbers)
def checkDbRows(self,dbNumbers):
dbNumbers=int(dbNumbers)
count=0
while count < 5:
tdSql.query("show databases;")
if tdSql.checkRows(dbNumbers+2):
tdLog.success("we find %d databases and expect %d in clusters! " %(tdSql.queryRows,dbNumbers+2))
return True
else:
continue
else :
tdLog.debug(tdSql.queryResult)
tdLog.exit("we find %d databases but expect %d in clusters! " %(tdSql.queryRows,dbNumbers))
def checkDb(self,dbNumbers,dbindex):
count=0
while count < 5:
query_status=0
for i in range(dbNumbers):
for j in range(dbNumbers):
tdSql.query("show databases;")
if "%s%d"%(dbindex,j) == tdSql.queryResult[i+2][0] :
if tdSql.queryResult[i+2][19] == "ready":
query_status+=1
else:
continue
# print(query_status)
count+=1
if query_status == dbNumbers:
tdLog.success("we find cluster with %d dnode and check all databases are ready within 5s! " %dbNumbers)
return True
else:
tdLog.debug(tdSql.queryResult)
tdLog.exit("database is not ready within 5s")
def checkData(self,dbname,stbname,stableCount,CtableCount,rowsPerSTable,):
tdSql.execute("use %s"%dbname)
tdSql.query("show stables")
tdSql.checkRows(stableCount)
tdSql.query("show tables")
tdSql.checkRows(CtableCount)
for i in range(stableCount):
tdSql.query("select count(*) from %s%d"%(stbname,i))
tdSql.checkData(0,0,rowsPerSTable)
return
def checkMnodeStatus(self,mnodeNums):
self.mnodeNums=int(mnodeNums)
# self.leaderDnode=int(leaderDnode)
count=0
while count < 10:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(self.mnodeNums) :
tdLog.success("cluster has %d mnodes" %self.mnodeNums )
if self.mnodeNums == 1:
if tdSql.queryResult[0][2]== 'leader' and tdSql.queryResult[0][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
count+=1
elif self.mnodeNums == 3 :
if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
elif tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
elif tdSql.queryResult[2][2]=='leader' and tdSql.queryResult[2][3]== 'ready' :
if tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
count+=1
elif self.mnodeNums == 2 :
if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
elif tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
count+=1
else:
tdLog.debug(tdSql.queryResult)
tdLog.exit("cluster of %d mnodes is not ready in 10s " %self.mnodeNums)
def check3mnodeoff(self,offlineDnodeNo,mnodeNums=3):
count=0
while count < 10:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(mnodeNums) :
tdLog.success("cluster has %d mnodes" %self.mnodeNums )
else:
tdLog.exit("mnode number is correct")
if offlineDnodeNo == 1:
if tdSql.queryResult[0][2]=='offline' :
if tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' :
tdLog.success("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
elif tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[2][2]=='leader' and tdSql.queryResult[2][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
count+=1
elif offlineDnodeNo == 2:
if tdSql.queryResult[1][2]=='offline' :
if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
elif tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[2][2]=='leader' and tdSql.queryResult[2][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
count+=1
elif offlineDnodeNo == 3:
if tdSql.queryResult[2][2]=='offline' :
if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
elif tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
count+=1
else:
tdLog.debug(tdSql.queryResult)
tdLog.exit("stop mnodes on dnode %d failed in 10s ")
def close(self):
self.cursor.close()
clusterComCheck = ClusterComCheck()
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from collections import defaultdict
import random
import string
import threading
import requests
import time
# import socketfrom
import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# class actionType(Enum):
# CREATE_DATABASE = 0
# CREATE_STABLE = 1
# CREATE_CTABLE = 2
# INSERT_DATA = 3
class ClusterComCreate:
def init(self, conn, logSql):
tdSql.init(conn.cursor())
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName)
def initConsumerInfoTable(self,cdbName='cdb'):
tdLog.info("drop consumeinfo table")
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql)
tdSql.query(sql)
def selectConsumeResult(self,expectRows,cdbName='cdb'):
resultList=[]
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == expectRows:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3))
return resultList
def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if (platform.system().lower() == 'windows'):
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
break
else:
time.sleep(0.1)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 :
print(tdSql.getData(0, 1), tdSql.getData(1, 1))
if tdSql.getData(1, 1) == 1:
break
time.sleep(0.1)
return
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, dbName,stbName):
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 int, c2 int, c3 binary(16)) tags(t1 int, t2 binary(32))"%(dbName, stbName))
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1):
tsql.execute("use %s" %dbName)
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
tagValue = 'beijing'
if (i % 2 == 0):
tagValue = 'shanghai'
sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i,stbName,i+1, tagValue)
if (i > 0) and (i%100 == 0):
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs is None:
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j)
else:
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, -j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs == 0:
t = time.time()
startTs = int(round(t * 1000))
ctbDict = {}
for i in range(ctbNum):
ctbDict[i] = 0
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfCtb = 0
while rowsOfCtb < rowsPerTbl:
for i in range(ctbNum):
sql += " %s.%s_%d values "%(dbName,ctbPrefix,i)
for k in range(batchNum):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + ctbDict[i], ctbDict[i], ctbDict[i])
ctbDict[i] += 1
if (0 == ctbDict[i]%batchNum) or (ctbDict[i] == rowsPerTbl):
tsql.execute(sql)
sql = "insert into "
break
rowsOfCtb = ctbDict[0]
tdLog.debug("insert data ............ [OK]")
return
def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data wiht auto create child table ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs == 0:
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
rowsOfSql += 1
if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsOfSql = 0
if j < rowsPerTbl - 1:
sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i,dbName,stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def syncCreateDbStbCtbInsertData(self, tsql, paraDict):
tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"])
tdCom.create_stable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdCom.create_ctable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
if "event" in paraDict and type(paraDict['event']) == type(threading.Event()):
paraDict["event"].set()
ctbPrefix = paraDict['ctbPrefix']
ctbNum = paraDict["ctbNum"]
for i in range(ctbNum):
tbName = '%s%s'%(ctbPrefix,i)
tdCom.insert_rows(tsql,dbname=paraDict["dbName"],tbname=tbName,start_ts_value=paraDict['startTs'],count=paraDict['rowsPerTbl'])
return
def threadFunction(self, **paraDict):
# create new connector for new tdSql instance in my thread
newTdSql = tdCom.newTdSql()
self.syncCreateDbStbCtbInsertData(self, newTdSql, paraDict)
return
def asyncCreateDbStbCtbInsertData(self, paraDict):
pThread = threading.Thread(target=self.threadFunction, kwargs=paraDict)
pThread.start()
return pThread
def close(self):
self.cursor.close()
clusterComCreate = ClusterComCreate()
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 10,
'rowsPerTbl': 4000,
'batchNum': 15,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1', 'topic2', 'topic3', 'topic4']
consumeGroupIdList = ['cgrp1', 'cgrp1', 'cgrp3', 'cgrp4']
consumerIdList = [0, 1, 2, 3]
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict['vgroups'],replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("insert data")
# tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create 4 topics")
sqlString = "create topic %s as database %s" %(topicNameList[0], paraDict['dbName'])
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
sqlString = "create topic %s as stable %s.%s" %(topicNameList[1], paraDict['dbName'], paraDict['stbName'])
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
queryString = "select * from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[2], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s " %(topicNameList[3], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query("show topics")
tdLog.debug(tdSql.queryResult)
rows = tdSql.getRows()
if rows != len(consumerIdList):
tdLog.exit("topic rows error")
for i in range (rows):
topicName = tdSql.getData(i,0)
matchFlag = 0
while matchFlag == 0:
for j in range(len(topicNameList)):
if topicName == topicNameList[j]:
matchFlag = 1
break
if matchFlag == 0:
tdLog.exit("topic name: %s is error", topicName)
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[0]
tmqCom.insertConsumerInfo(consumerIdList[0], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
topicList = topicNameList[1]
keyList = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[1]
tmqCom.insertConsumerInfo(consumerIdList[1], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
topicList = topicNameList[2]
keyList = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[2]
tmqCom.insertConsumerInfo(consumerIdList[2], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
topicList = topicNameList[3]
keyList = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[3]
tmqCom.insertConsumerInfo(consumerIdList[3], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("async insert data")
pThread = tmqCom.asyncInsertData(paraDict)
time.sleep(5)
tdLog.info("check show consumers")
tdSql.query("show consumers")
# tdLog.info(tdSql.queryResult)
rows = tdSql.getRows()
tdLog.info("show consumers rows: %d"%rows)
if rows != len(topicNameList):
tdLog.exit("show consumers rows error")
tdLog.info("check show subscriptions")
tdSql.query("show subscriptions")
# tdLog.debug(tdSql.queryResult)
rows = tdSql.getRows()
tdLog.info("show subscriptions rows: %d"%rows)
if rows != paraDict['vgroups'] * len(topicNameList):
tdLog.exit("show subscriptions rows error")
pThread.join()
tdLog.info("insert process end, and start to check consume result")
expectRows = len(consumerIdList)
_ = tmqCom.selectConsumeResult(expectRows)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -23,6 +23,7 @@ python3 ./test.py -f 1-insert/alter_stable.py
python3 ./test.py -f 1-insert/alter_table.py
python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
python3 ./test.py -f 1-insert/table_comment.py
python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 2-query/between.py
python3 ./test.py -f 2-query/distinct.py
python3 ./test.py -f 2-query/varchar.py
......@@ -116,6 +117,10 @@ python3 ./test.py -f 2-query/function_null.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
......@@ -139,3 +144,4 @@ python3 ./test.py -f 7-tmq/tmqCheckData.py
python3 ./test.py -f 7-tmq/tmqUdf.py
#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5
python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
python3 ./test.py -f 7-tmq/tmqShow.py
......@@ -22,9 +22,6 @@ import json
import platform
import socket
import threading
from distutils.log import warn as printf
from tkinter import N
from fabric2 import Connection
sys.path.append("../pytest")
from util.log import *
from util.dnodes import *
......
......@@ -635,8 +635,9 @@ void loop_consume(SThreadInfo* pInfo) {
}
}
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
if (tmqMsg) {
if (0 != g_stConfInfo.showMsgFlag) {
totalRows += msg_process(tmqMsg, pInfo, totalMsgs);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册