diff --git a/tests/pytest/util/cluster.py b/tests/pytest/util/cluster.py index 6eb78f07716b22dbff09b10a9d779ec8b5672b36..21cad4c5bb2f5a5fc2b3a492e7298a4ff29115d8 100644 --- a/tests/pytest/util/cluster.py +++ b/tests/pytest/util/cluster.py @@ -24,7 +24,7 @@ class ClusterDnodes(TDDnodes): class ConfigureyCluster: - """This will create defined number of dnodes and create a cluset. + """This will create defined number of dnodes and create a cluster. at the same time, it will return TDDnodes list: dnodes, """ hostname= socket.gethostname() @@ -85,8 +85,8 @@ class ConfigureyCluster: count+=1 time.sleep(1) else: - tdLog.debug("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodeNums) - return -1 + tdLog.exit("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodeNums) + cluster = ConfigureyCluster() \ No newline at end of file diff --git a/tests/system-test/1-insert/table_param_ttl.py b/tests/system-test/1-insert/table_param_ttl.py new file mode 100644 index 0000000000000000000000000000000000000000..49d6476d9ce94330e45e6f054178fea803c823ca --- /dev/null +++ b/tests/system-test/1-insert/table_param_ttl.py @@ -0,0 +1,79 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * + +class TDTestCase: + updatecfgDict = {'ttlUnit':5,'ttlPushInterval':3} + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.ntbname = 'ntb' + self.stbname = 'stb' + self.tbnum = 10 + self.ttl_param = 1 + self.default_ttl = 100 + self.modify_ttl = 1 + def ttl_check_ntb(self): + tdSql.prepare() + + for i in range(self.tbnum): + tdSql.execute(f'create table {self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}') + tdSql.query(f'show tables') + tdSql.checkRows(self.tbnum) + sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval']) + tdSql.query(f'show tables') + tdSql.checkRows(0) + for i in range(self.tbnum): + tdSql.execute(f'create table {self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}') + for i in range(int(self.tbnum/2)): + tdSql.execute(f'alter table {self.ntbname}_{i} ttl {self.modify_ttl}') + sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval']) + tdSql.query(f'show tables') + tdSql.checkRows(self.tbnum - int(self.tbnum/2)) + tdSql.execute('drop database db') + def ttl_check_ctb(self): + tdSql.prepare() + tdSql.execute(f'create table {self.stbname} (ts timestamp,c0 int) tags(t0 int)') + + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({i}) ttl {self.ttl_param}') + tdSql.query(f'show tables') + tdSql.checkRows(self.tbnum) + sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval']) + tdSql.query(f'show tables') + tdSql.checkRows(0) + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({i}) ttl {self.default_ttl}') + tdSql.query(f'show tables') + tdSql.checkRows(self.tbnum) + for i in range(int(self.tbnum/2)): + tdSql.execute(f'alter table {self.stbname}_{i} ttl {self.modify_ttl}') + sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval']) + tdSql.query(f'show tables') + tdSql.checkRows(self.tbnum - int(self.tbnum/2)) + tdSql.execute('drop database db') + + def run(self): + self.ttl_check_ntb() + self.ttl_check_ctb() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/bottom.py b/tests/system-test/2-query/bottom.py index 1037b0a8f307754ee5aa7c7f3aa30f2a47d74082..a95daf22f422c485cf42181b570e986f4da7bfbe 100644 --- a/tests/system-test/2-query/bottom.py +++ b/tests/system-test/2-query/bottom.py @@ -17,100 +17,140 @@ from util.log import * from util.cases import * from util.sql import * from util.common import * - +from util.sqlset import * class TDTestCase: def init(self, conn, logSql): tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor()) - + self.dbname = 'db_test' + self.setsql = TDSetSql() + self.ntbname = 'ntb' self.rowNum = 10 self.tbnum = 20 self.ts = 1537146000000 self.binary_str = 'taosdata' self.nchar_str = '涛思数据' - def bottom_check_base(self): - tdSql.prepare() - tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 tinyint unsigned, col6 smallint unsigned, - col7 int unsigned, col8 bigint unsigned, col9 float, col10 double, col11 bool, col12 binary(20), col13 nchar(20)) tags(loc nchar(20))''') - tdSql.execute("create table stb_1 using stb tags('beijing')") - column_list = ['col1','col2','col3','col4','col5','col6','col7','col8'] - error_column_list = ['col11','col12','col13'] - error_param_list = [0,101] - for i in range(self.rowNum): - tdSql.execute(f"insert into stb_1 values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')" - % (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1)) + self.column_dict = { + 'ts' : 'timestamp', + 'col1': 'tinyint', + 'col2': 'smallint', + 'col3': 'int', + 'col4': 'bigint', + 'col5': 'tinyint unsigned', + 'col6': 'smallint unsigned', + 'col7': 'int unsigned', + 'col8': 'bigint unsigned', + 'col9': 'float', + 'col10': 'double', + 'col11': 'bool', + 'col12': 'binary(20)', + 'col13': 'nchar(20)' + } - for i in column_list: - tdSql.query(f'select bottom({i},2) from stb_1') - tdSql.checkRows(2) - tdSql.checkEqual(tdSql.queryResult,[(2,),(1,)]) - for j in error_param_list: - tdSql.error(f'select bottom({i},{j}) from stb_1') - for i in error_column_list: - tdSql.error(f'select bottom({i},10) from stb_1') - tdSql.query("select ts,bottom(col1, 2),ts from stb_1 group by tbname") - tdSql.checkRows(2) - tdSql.query('select bottom(col2,1) from stb_1 interval(1y) order by col2') - tdSql.checkData(0,0,1) - - tdSql.error('select * from stb_1 where bottom(col2,1)=1') - tdSql.execute('drop database db') - def bottom_check_distribute(self): - # prepare data for vgroup 4 - dbname = tdCom.getLongName(5, "letters") + self.param_list = [1,100] + def insert_data(self,column_dict,tbname,row_num): + sql = '' + for k, v in column_dict.items(): + if v.lower() == 'timestamp' or v.lower() == 'tinyint' or v.lower() == 'smallint' or v.lower() == 'int' or v.lower() == 'bigint' or \ + v.lower() == 'tinyint unsigned' or v.lower() == 'smallint unsigned' or v.lower() == 'int unsigned' or v.lower() == 'bigint unsigned' or v.lower() == 'bool': + sql += '%d,' + elif v.lower() == 'float' or v.lower() == 'double': + sql += '%f,' + elif 'binary' in v.lower(): + sql += f'"{self.binary_str}%d",' + elif 'nchar' in v.lower(): + sql += f'"{self.nchar_str}%d",' + insert_sql = f'insert into {tbname} values({sql[:-1]})' + for i in range(row_num): + insert_list = [] + for k, v in column_dict.items(): + if v.lower() in[ 'tinyint' , 'smallint' , 'int', 'bigint' , 'tinyint unsigned' , 'smallint unsigned' , 'int unsigned' , 'bigint unsigned'] or\ + 'binary' in v.lower() or 'nchar' in v.lower(): + insert_list.append(0 + i) + elif v.lower() == 'float' or v.lower() == 'double': + insert_list.append(0.1 + i) + elif v.lower() == 'bool': + insert_list.append(i % 2) + elif v.lower() == 'timestamp': + insert_list.append(self.ts + i) + tdSql.execute(insert_sql%(tuple(insert_list))) + def bottom_check_data(self,tbname,tb_type): + new_column_dict = {} + for param in self.param_list: + for k,v in self.column_dict.items(): + if v.lower() in ['tinyint','smallint','int','bigint','tinyint unsigned','smallint unsigned','int unsigned','bigint unsigned']: + tdSql.query(f'select bottom({k},{param}) from {tbname} order by {k}') + if param >= self.rowNum: + if tb_type in ['normal_table','child_table']: + tdSql.checkRows(self.rowNum) + values_list = [] + for i in range(self.rowNum): + tp = (i,) + values_list.append(tp) + tdSql.checkEqual(tdSql.queryResult,values_list) + elif tb_type == 'stable': + tdSql.checkRows(param) + elif param < self.rowNum: + if tb_type in ['normal_table','child_table']: + tdSql.checkRows(param) + values_list = [] + for i in range(param): + tp = (i,) + values_list.append(tp) + tdSql.checkEqual(tdSql.queryResult,values_list) + elif tb_type == 'stable': + tdSql.checkRows(param) + for i in [self.param_list[0]-1,self.param_list[-1]+1]: + tdSql.error(f'select top({k},{i}) from {tbname}') + new_column_dict.update({k:v}) + elif v.lower() == 'bool' or 'binary' in v.lower() or 'nchar' in v.lower(): + tdSql.error(f'select top({k},{param}) from {tbname}') + tdSql.error(f'select * from {tbname} where top({k},{param})=1') + pass + def bottom_check_ntb(self): + tdSql.execute(f'create database if not exists {self.dbname} vgroups 1') + tdSql.execute(f'use {self.dbname}') + tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict)) + self.insert_data(self.column_dict,self.ntbname,self.rowNum) + self.bottom_check_data(self.ntbname,'normal_table') + tdSql.execute(f'drop database {self.dbname}') + def bottom_check_stb(self): stbname = tdCom.getLongName(5, "letters") - vgroup_num = 2 - child_table_num = 20 - tdSql.execute(f"create database if not exists {dbname} vgroups {vgroup_num}") - tdSql.execute(f'use {dbname}') - # build 20 child tables,every table insert 10 rows - tdSql.execute(f'''create table {stbname}(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 tinyint unsigned, col6 smallint unsigned, - col7 int unsigned, col8 bigint unsigned, col9 float, col10 double, col11 bool, col12 binary(20), col13 nchar(20)) tags(loc nchar(20))''') - for i in range(child_table_num): - tdSql.execute(f"create table {stbname}_{i} using {stbname} tags('beijing')") - tdSql.execute(f"insert into {stbname}_{i}(ts) values(%d)" % (self.ts - 1-i)) - column_list = ['col1','col2','col3','col4','col5','col6','col7','col8'] - error_column_list = ['col11','col12','col13'] - error_param_list = [0,101] - for i in [f'{stbname}', f'{dbname}.{stbname}']: - for j in column_list: - tdSql.query(f"select bottom({j},1) from {i}") - tdSql.checkRows(0) + tag_dict = { + 't0':'int' + } + tag_values = [ + f'1' + ] + tdSql.execute(f"create database if not exists {self.dbname} vgroups 2") + tdSql.execute(f'use {self.dbname}') + tdSql.execute(self.setsql.set_create_stable_sql(stbname,self.column_dict,tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f"create table {stbname}_{i} using {stbname} tags({tag_values[0]})") + tdSql.execute(self.insert_data(self.column_dict,f'{stbname}_{i}',self.rowNum)) tdSql.query('show tables') vgroup_list = [] for i in range(len(tdSql.queryResult)): vgroup_list.append(tdSql.queryResult[i][6]) vgroup_list_set = set(vgroup_list) - for i in vgroup_list_set: vgroups_num = vgroup_list.count(i) - if vgroups_num >=2: + if vgroups_num >= 2: tdLog.info(f'This scene with {vgroups_num} vgroups is ok!') - continue else: - tdLog.exit(f'This scene does not meet the requirements with {vgroups_num} vgroup!\n') - for i in range(self.rowNum): - for j in range(child_table_num): - tdSql.execute(f"insert into {stbname}_{j} values(%d, %d, %d, %d, %d, %d, %d, %d, %d, %f, %f, %d, '{self.binary_str}%d', '{self.nchar_str}%d')" - % (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1)) - for i in column_list: - tdSql.query(f'select bottom({i},2) from {stbname}') - tdSql.checkRows(2) - tdSql.checkEqual(tdSql.queryResult,[(1,),(1,)]) - for j in error_param_list: - tdSql.error(f'select bottom({i},{j}) from {stbname}') - for i in error_column_list: - tdSql.error(f'select bottom({i},10) from {stbname}') - - tdSql.execute(f'drop database {dbname}') + tdLog.exit( + 'This scene does not meet the requirements with {vgroups_num} vgroup!\n') + for i in range(self.tbnum): + self.bottom_check_data(f'{stbname}_{i}','child_table') + self.bottom_check_data(f'{stbname}','stable') + tdSql.execute(f'drop database {self.dbname}') + def run(self): - - self.bottom_check_base() - self.bottom_check_distribute() + self.bottom_check_ntb() + self.bottom_check_stb() - def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/system-test/2-query/json_tag.py b/tests/system-test/2-query/json_tag.py index 2ef1b8dad2249aaf63c6feebc48ff05d36da3254..9b96b6ebd00864a802971220ce2ecec969b2ab19 100644 --- a/tests/system-test/2-query/json_tag.py +++ b/tests/system-test/2-query/json_tag.py @@ -11,12 +11,14 @@ # -*- coding: utf-8 -*- +import imp import sys import taos from util.log import tdLog from util.cases import tdCases from util.sql import tdSql import json +import os class TDTestCase: @@ -29,6 +31,9 @@ class TDTestCase: return def init(self, conn, logSql): + self.testcasePath = os.path.split(__file__)[0] + self.testcaseFilename = os.path.split(__file__)[-1] + os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename)) tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor(), logSql) @@ -557,6 +562,103 @@ class TDTestCase: tdSql.checkRows(3) tdSql.query("select round(dataint) from jsons1 where jtag->'tag1'>1") tdSql.checkRows(3) + + #math function + tdSql.query("select sin(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select cos(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select tan(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select asin(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select acos(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select atan(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select ceil(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select floor(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select round(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select abs(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select pow(dataint,5) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select log(dataint,10) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select sqrt(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select HISTOGRAM(dataint,'user_input','[1, 33, 555, 7777]',1) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select csum(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select mavg(dataint,1) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select statecount(dataint,'GE',10) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select stateduration(dataint,'GE',0) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select sample(dataint,3) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select HYPERLOGLOG(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(1) + tdSql.query("select twa(dataint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(1) + + # function not ready + # tdSql.query("select tail(dataint,1) from jsons1 where jtag->'tag1'>1;") + # tdSql.checkRows(3) + # tdSql.query("select unique(dataint) from jsons1 where jtag->'tag1'>1;") + # tdSql.checkRows(3) + # tdSql.query("select mode(dataint) from jsons1 where jtag->'tag1'>1;") + # tdSql.checkRows(3) + # tdSql.query("select irate(dataint) from jsons1 where jtag->'tag1'>1;") + # tdSql.checkRows(1) + + #str function + tdSql.query("select upper(dataStr) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select ltrim(dataStr) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select lower(dataStr) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select rtrim(dataStr) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select LENGTH(dataStr) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select CHAR_LENGTH(dataStr) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select SUBSTR(dataStr,5) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select CONCAT(dataStr,dataStrBin) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select CONCAT_ws('adad!@!@%$^$%$^$%^a',dataStr,dataStrBin) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select CAST(dataStr as bigint) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + + #time function + tdSql.query("select now() from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select today() from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select TIMEZONE() from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select TO_ISO8601(ts) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select TO_UNIXTIMESTAMP(datastr) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select TIMETRUNCATE(ts,1u) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select TIMEDIFF(ts,_c0) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select TIMEDIFF(ts,1u) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(3) + tdSql.query("select ELAPSED(ts,1h) from jsons1 where jtag->'tag1'>1;") + tdSql.checkRows(1) + # # #test TD-12077 tdSql.execute("insert into jsons1_16 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":-2.111}') values(1591062628000, 2, NULL, '你就会', 'dws')") diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopCreateDb.py new file mode 100644 index 0000000000000000000000000000000000000000..59fe1c0b169a99858b0b893df6f2bfdfecae025d --- /dev/null +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopCreateDb.py @@ -0,0 +1,181 @@ +from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self,conn ,logSql): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,countstart,countstop): + # fisrt add data : db\stable\childtable\general table + + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) + tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("use db%d" %couti) + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + + def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1} + dnodenumbers=int(dnodenumbers) + mnodeNums=int(mnodeNums) + dbNumbers = int(dnodenumbers * restartNumber) + + tdLog.info("first check dnode and mnode") + tdSql.query("show dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodenumbers) + clusterComCheck.checkMnodeStatus(1) + + # fisr add three mnodes; + tdLog.info("fisr add three mnodes and check mnode status") + tdSql.execute("create mnode on dnode 2") + clusterComCheck.checkMnodeStatus(2) + tdSql.execute("create mnode on dnode 3") + clusterComCheck.checkMnodeStatus(3) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("show dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodenumbers) + + tdLog.info("Take turns stopping all dnodes ") + # seperate vnode and mnode in different dnodes. + # create database and stable + tdDnodes=cluster.dnodes + stopcount =0 + while stopcount < restartNumber: + for i in range(dnodenumbers): + # threads=[] + # threads = MyThreadFunc(self.insert_data(i*2,i*2+2)) + paraDict["dbName"]= 'db%d%d'%(stopcount,i) + threads=threading.Thread(target=clusterComCreate.create_database, args=(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])) + threads.start() + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + + if clusterComCheck.checkDnodes(dnodenumbers): + # threads.join() + tdLog.info("first restart loop") + else: + print("456") + threads.join() + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + threads.join() + clusterComCheck.checkDnodes(dnodenumbers) + clusterComCheck.checkDbRows(dbNumbers) + for i in range(restartNumber): + clusterComCheck.checkDb(dnodenumbers,'db%d'%i) + + + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(5,3,1) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py new file mode 100644 index 0000000000000000000000000000000000000000..cf608f64807d1046c0f776347f2f61d321ef87ba --- /dev/null +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py @@ -0,0 +1,181 @@ +from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self,conn ,logSql): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,countstart,countstop): + # fisrt add data : db\stable\childtable\general table + + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) + tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("use db%d" %couti) + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + + def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1} + dnodenumbers=int(dnodenumbers) + mnodeNums=int(mnodeNums) + dbNumbers = int(mnodeNums * restartNumber) + tdLog.info("first check dnode and mnode") + tdSql.query("show dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodenumbers) + clusterComCheck.checkMnodeStatus(1) + + # fisr add three mnodes; + tdLog.info("fisr add three mnodes and check mnode status") + tdSql.execute("create mnode on dnode 2") + clusterComCheck.checkMnodeStatus(2) + tdSql.execute("create mnode on dnode 3") + clusterComCheck.checkMnodeStatus(3) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("show dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodenumbers) + + tdLog.info("Take turns stopping Mnodes ") + # seperate vnode and mnode in different dnodes. + # create database and stable + tdDnodes=cluster.dnodes + stopcount =0 + while stopcount < restartNumber: + tdLog.info("first restart loop") + for i in range(mnodeNums): + # threads=[] + # threads = MyThreadFunc(self.insert_data(i*2,i*2+2)) + paraDict["dbName"]= 'db%d%d'%(stopcount,i) + threads=threading.Thread(target=clusterComCreate.create_database, args=(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])) + threads.start() + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + + if clusterComCheck.checkDnodes(dnodenumbers): + # threads.join() + tdLog.info("123") + else: + print("456") + threads.join() + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + threads.join() + clusterComCheck.checkDnodes(dnodenumbers) + clusterComCheck.checkDbRows(dbNumbers) + for i in range(restartNumber): + clusterComCheck.checkDb(mnodeNums,'db%d'%i) + + + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(5,3,1) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py new file mode 100644 index 0000000000000000000000000000000000000000..2d2322fada3d87157653404727b94ab1a723188c --- /dev/null +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py @@ -0,0 +1,181 @@ +from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self,conn ,logSql): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,countstart,countstop): + # fisrt add data : db\stable\childtable\general table + + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) + tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("use db%d" %couti) + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + + def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1} + dnodenumbers=int(dnodenumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodenumbers-mnodeNums) + dbNumbers = int(vnodeNumbers * restartNumber) + tdLog.info("first check dnode and mnode") + tdSql.query("show dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodenumbers) + clusterComCheck.checkMnodeStatus(1) + + # fisr add three mnodes; + tdLog.info("fisr add three mnodes and check mnode status") + tdSql.execute("create mnode on dnode 2") + clusterComCheck.checkMnodeStatus(2) + tdSql.execute("create mnode on dnode 3") + clusterComCheck.checkMnodeStatus(3) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("show dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodenumbers) + + tdLog.info("Take turns stopping Vnodes ") + # seperate vnode and mnode in different dnodes. + # create database and stable + tdDnodes=cluster.dnodes + stopcount =0 + while stopcount < restartNumber: + for i in range(vnodeNumbers): + # threads=[] + # threads = MyThreadFunc(self.insert_data(i*2,i*2+2)) + paraDict["dbName"]= 'db%d%d'%(stopcount,i) + threads=threading.Thread(target=clusterComCreate.create_database, args=(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])) + threads.start() + tdDnodes[mnodeNums+i].stoptaosd() + # sleep(10) + tdDnodes[mnodeNums+i].starttaosd() + # sleep(10) + + if clusterComCheck.checkDnodes(vnodeNumbers): + # threads.join() + tdLog.info("first restart loop") + else: + print("456") + threads.join() + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + threads.join() + clusterComCheck.checkDnodes(dnodenumbers) + clusterComCheck.checkDbRows(dbNumbers) + for i in range(restartNumber): + clusterComCheck.checkDb(vnodeNumbers,'db%d'%i) + + + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(5,3,1) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/6-cluster/5dnode3mnodeSeperate1VnodeStopInsert.py b/tests/system-test/6-cluster/5dnode3mnodeSeperate1VnodeStopInsert.py index 1739db09af6f784eb22c7bfbe20aef4b9190720d..aa1d7ecc290c5950fa50ab29d1a97aff43ee58ed 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSeperate1VnodeStopInsert.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSeperate1VnodeStopInsert.py @@ -9,6 +9,11 @@ from util.sql import * from util.cases import * from util.dnodes import TDDnodes from util.dnodes import TDDnode +from util.cluster import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + import time import socket import subprocess @@ -17,26 +22,13 @@ import threading import time import inspect import ctypes -class MyDnodes(TDDnodes): - def __init__(self ,dnodes_lists): - super(MyDnodes,self).__init__() - self.dnodes = dnodes_lists # dnode must be TDDnode instance - self.simDeployed = False - class TDTestCase: def init(self,conn ,logSql): tdLog.debug(f"start to excute {__file__}") - self.TDDnodes = None - - def buildcluster(self,dnodenumber): - self.depoly_cluster(dnodenumber) - self.master_dnode = self.TDDnodes.dnodes[0] - self.host=self.master_dnode.cfgDict["fqdn"] - conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir) - tdSql.init(conn1.cursor()) - + # tdSql.init(conn.cursor()) + # self.host = socket.gethostname() def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -106,52 +98,6 @@ class TDTestCase: tdSql.checkData(0,0,rowsPerSTable) return - def depoly_cluster(self ,dnodes_nums=5,independent=True): - - testCluster = False - valgrind = 0 - hostname = socket.gethostname() - dnodes = [] - start_port = 6030 - start_port_sec = 6130 - for num in range(1, dnodes_nums+1): - dnode = TDDnode(num) - dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") - dnode.addExtraCfg("fqdn", f"{hostname}") - dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") - dnode.addExtraCfg("monitorFqdn", hostname) - dnode.addExtraCfg("monitorPort", 7043) - dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}") - # configure three dnoe don't support vnodes - if independent and (num < 4): - dnode.addExtraCfg("supportVnodes", 0) - - dnodes.append(dnode) - - self.TDDnodes = MyDnodes(dnodes) - self.TDDnodes.init("") - self.TDDnodes.setTestCluster(testCluster) - self.TDDnodes.setValgrind(valgrind) - self.TDDnodes.stopAll() - for dnode in self.TDDnodes.dnodes: - self.TDDnodes.deploy(dnode.index,{}) - - for dnode in self.TDDnodes.dnodes: - self.TDDnodes.starttaosd(dnode.index) - - # create cluster - for dnode in self.TDDnodes.dnodes[1:]: - # print(dnode.cfgDict) - dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] - dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] - dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] - cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;" - print(cmd) - os.system(cmd) - - time.sleep(2) - tdLog.info(" create cluster with %d dnode done! " %dnodes_nums) - def checkdnodes(self,dnodenumber): count=0 while count < 100: @@ -305,6 +251,14 @@ class TDTestCase: tdSql.checkData(2,2,'offline') tdSql.checkData(2,3,'ready') + + def check5dnode(self): + tdSql.query("show dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + tdSql.checkData(0,4,'ready') + tdSql.checkData(4,4,'ready') + def five_dnode_three_mnode(self,dnodenumber): tdSql.query("show dnodes;") tdSql.checkData(0,1,'%s:6030'%self.host) @@ -346,6 +300,7 @@ class TDTestCase: threads.join() else: print("456") + threads.join() self.stop_thread(threads) assert 1 == 2 ,"some dnode started failed" return False @@ -357,16 +312,8 @@ class TDTestCase: self.check3mnode() - def getConnection(self, dnode): - host = dnode.cfgDict["fqdn"] - port = dnode.cfgDict["serverPort"] - config_dir = dnode.cfgDir - return taos.connect(host=host, port=int(port), config=config_dir) - - def run(self): # print(self.master_dnode.cfgDict) - self.buildcluster(5) self.five_dnode_three_mnode(5) def stop(self): diff --git a/tests/system-test/6-cluster/5dnode3mnodeStop.py b/tests/system-test/6-cluster/5dnode3mnodeStop.py index 3a85207af6c2e01fdb4795f66a2ffec0eb5c639d..5311d29846ecf1ec93c912abaf9634d40a50d680 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeStop.py +++ b/tests/system-test/6-cluster/5dnode3mnodeStop.py @@ -12,7 +12,10 @@ from util.dnodes import TDDnodes from util.dnodes import TDDnode from util.cluster import * from test import tdDnodes +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import * import time import socket import subprocess @@ -216,64 +219,88 @@ class TDTestCase: else: tdLog.exit("create cluster with %d dnode but check dnode not ready within 5s ! "%dnodeNumbers) - def five_dnode_three_mnode(self,dnodenumber): - self.check_dnodes_status(5) - tdSql.query("show mnodes;") - tdLog.debug(self.host) - tdSql.checkRows(1) + def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1} + dnodenumbers=int(dnodenumbers) + mnodeNums=int(mnodeNums) + dbNumbers = int(dnodenumbers * restartNumber) + + tdLog.info("first check dnode and mnode") + tdSql.query("show dnodes;") tdSql.checkData(0,1,'%s:6030'%self.host) - tdSql.checkData(0,2,'leader') - tdSql.checkData(0,3,'ready') + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodenumbers) + clusterComCheck.checkMnodeStatus(1) # fisr add three mnodes; + tdLog.info("fisr add three mnodes and check mnode status") tdSql.execute("create mnode on dnode 2") - time.sleep(10) + clusterComCheck.checkMnodeStatus(2) tdSql.execute("create mnode on dnode 3") + clusterComCheck.checkMnodeStatus(3) - # fisrt check statut ready - self.check3mnode() - - + # add some error operations and + tdLog.info("Confirm the status of the dnode again") tdSql.error("create mnode on dnode 2") - tdSql.query("show dnodes;") - # tdLog.debug(tdSql.queryResult) - - tdLog.debug("stop and follower of mnode") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodenumbers) + # restart all taosd tdDnodes=cluster.dnodes - # tdLog.debug(tdDnodes[0]) tdDnodes[1].stoptaosd() - self.check3mnode2off() + clusterComCheck.check3mnodeoff(2,3) tdDnodes[1].starttaosd() - self.check3mnode() + clusterComCheck.checkMnodeStatus(3) tdDnodes[2].stoptaosd() - self.check3mnode3off() + clusterComCheck.check3mnodeoff(3,3) tdDnodes[2].starttaosd() - self.check3mnode() + clusterComCheck.checkMnodeStatus(3) tdDnodes[0].stoptaosd() - self.check3mnode1off() + clusterComCheck.check3mnodeoff(1,3) tdDnodes[0].starttaosd() - self.check3mnode() + clusterComCheck.checkMnodeStatus(3) - self.check3mnode() + tdLog.info("Take turns stopping all dnodes ") + # seperate vnode and mnode in different dnodes. + # create database and stable stopcount =0 while stopcount <= 2: - for i in range(dnodenumber): + tdLog.info("first restart loop") + for i in range(dnodenumbers): tdDnodes[i].stoptaosd() tdDnodes[i].starttaosd() - # self.check3mnode() stopcount+=1 - self.check3mnode() + clusterComCheck.checkDnodes(dnodenumbers) + clusterComCheck.checkMnodeStatus(3) def run(self): - self.five_dnode_three_mnode(5) + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(5,3,1) def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py new file mode 100644 index 0000000000000000000000000000000000000000..d03000896340afad5ec84593e3d8a29eb410c486 --- /dev/null +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -0,0 +1,211 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from collections import defaultdict +import random +import string +import threading +import requests +import time +# import socketfrom + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + +# class actionType(Enum): +# CREATE_DATABASE = 0 +# CREATE_STABLE = 1 +# CREATE_CTABLE = 2 +# INSERT_DATA = 3 + +class ClusterComCheck: + def init(self, conn, logSql): + tdSql.init(conn.cursor()) + # tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def checkDnodes(self,dnodeNumbers): + count=0 + while count < 5: + tdSql.query("show dnodes") + # tdLog.debug(tdSql.queryResult) + status=0 + for i in range(dnodeNumbers): + if tdSql.queryResult[i][4] == "ready": + status+=1 + tdLog.info(status) + + if status == dnodeNumbers: + tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within 5s! " %dnodeNumbers) + return True + count+=1 + time.sleep(1) + else: + tdLog.debug(tdSql.queryResult) + tdLog.exit("it find cluster with %d dnodes but check that there dnodes are not ready within 5s ! "%dnodeNumbers) + + def checkDbRows(self,dbNumbers): + dbNumbers=int(dbNumbers) + count=0 + while count < 5: + tdSql.query("show databases;") + if tdSql.checkRows(dbNumbers+2): + tdLog.success("we find %d databases and expect %d in clusters! " %(tdSql.queryRows,dbNumbers+2)) + return True + else: + continue + else : + tdLog.debug(tdSql.queryResult) + tdLog.exit("we find %d databases but expect %d in clusters! " %(tdSql.queryRows,dbNumbers)) + + def checkDb(self,dbNumbers,dbindex): + count=0 + while count < 5: + query_status=0 + for i in range(dbNumbers): + for j in range(dbNumbers): + tdSql.query("show databases;") + if "%s%d"%(dbindex,j) == tdSql.queryResult[i+2][0] : + if tdSql.queryResult[i+2][19] == "ready": + query_status+=1 + else: + continue + # print(query_status) + count+=1 + if query_status == dbNumbers: + tdLog.success("we find cluster with %d dnode and check all databases are ready within 5s! " %dbNumbers) + return True + else: + tdLog.debug(tdSql.queryResult) + tdLog.exit("database is not ready within 5s") + + def checkData(self,dbname,stbname,stableCount,CtableCount,rowsPerSTable,): + tdSql.execute("use %s"%dbname) + tdSql.query("show stables") + tdSql.checkRows(stableCount) + tdSql.query("show tables") + tdSql.checkRows(CtableCount) + for i in range(stableCount): + tdSql.query("select count(*) from %s%d"%(stbname,i)) + tdSql.checkData(0,0,rowsPerSTable) + return + + def checkMnodeStatus(self,mnodeNums): + self.mnodeNums=int(mnodeNums) + # self.leaderDnode=int(leaderDnode) + + count=0 + + while count < 10: + time.sleep(1) + tdSql.query("show mnodes;") + if tdSql.checkRows(self.mnodeNums) : + tdLog.success("cluster has %d mnodes" %self.mnodeNums ) + + if self.mnodeNums == 1: + if tdSql.queryResult[0][2]== 'leader' and tdSql.queryResult[0][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + count+=1 + elif self.mnodeNums == 3 : + if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' : + if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' : + if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + elif tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' : + if tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' : + if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + elif tdSql.queryResult[2][2]=='leader' and tdSql.queryResult[2][3]== 'ready' : + if tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' : + if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + count+=1 + elif self.mnodeNums == 2 : + if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' : + if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + elif tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' : + if tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + count+=1 + else: + tdLog.debug(tdSql.queryResult) + tdLog.exit("cluster of %d mnodes is not ready in 10s " %self.mnodeNums) + + + + + def check3mnodeoff(self,offlineDnodeNo,mnodeNums=3): + count=0 + while count < 10: + time.sleep(1) + tdSql.query("show mnodes;") + if tdSql.checkRows(mnodeNums) : + tdLog.success("cluster has %d mnodes" %self.mnodeNums ) + else: + tdLog.exit("mnode number is correct") + if offlineDnodeNo == 1: + if tdSql.queryResult[0][2]=='offline' : + if tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' : + if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' : + tdLog.success("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + elif tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' : + if tdSql.queryResult[2][2]=='leader' and tdSql.queryResult[2][3]== 'ready' : + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + count+=1 + elif offlineDnodeNo == 2: + if tdSql.queryResult[1][2]=='offline' : + if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' : + if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' : + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + elif tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' : + if tdSql.queryResult[2][2]=='leader' and tdSql.queryResult[2][3]== 'ready' : + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + count+=1 + elif offlineDnodeNo == 3: + if tdSql.queryResult[2][2]=='offline' : + if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' : + if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' : + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + elif tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' : + if tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' : + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + count+=1 + else: + tdLog.debug(tdSql.queryResult) + tdLog.exit("stop mnodes on dnode %d failed in 10s ") + + + + + + + def close(self): + self.cursor.close() + +clusterComCheck = ClusterComCheck() diff --git a/tests/system-test/6-cluster/clusterCommonCreate.py b/tests/system-test/6-cluster/clusterCommonCreate.py new file mode 100644 index 0000000000000000000000000000000000000000..b3107d8537297ec7497407af8f74b96f14bab724 --- /dev/null +++ b/tests/system-test/6-cluster/clusterCommonCreate.py @@ -0,0 +1,298 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from collections import defaultdict +import random +import string +import threading +import requests +import time +# import socketfrom + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + +# class actionType(Enum): +# CREATE_DATABASE = 0 +# CREATE_STABLE = 1 +# CREATE_CTABLE = 2 +# INSERT_DATA = 3 + +class ClusterComCreate: + def init(self, conn, logSql): + tdSql.init(conn.cursor()) + # tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + tdSql.query("drop table if exists %s.notifyinfo "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName) + + def initConsumerInfoTable(self,cdbName='cdb'): + tdLog.info("drop consumeinfo table") + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'): + while 1: + tdSql.query("select * from %s.notifyinfo"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0): + break + else: + time.sleep(0.1) + return + + def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'): + while 1: + tdSql.query("select * from %s.notifyinfo"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 2 : + print(tdSql.getData(0, 1), tdSql.getData(1, 1)) + if tdSql.getData(1, 1) == 1: + break + time.sleep(0.1) + return + + def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica)) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, dbName,stbName): + tsql.execute("create table if not exists %s.%s (ts timestamp, c1 int, c2 int, c3 binary(16)) tags(t1 int, t2 binary(32))"%(dbName, stbName)) + tdLog.debug("complete to create %s.%s" %(dbName, stbName)) + return + + def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1): + tsql.execute("use %s" %dbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + tagValue = 'beijing' + if (i % 2 == 0): + tagValue = 'shanghai' + + sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i,stbName,i+1, tagValue) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + if startTs is None: + t = time.time() + startTs = int(round(t * 1000)) + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + t = time.time() + startTs = int(round(t * 1000)) + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s%d values "%(ctbPrefix,i) + for j in range(rowsPerTbl): + if (j % 2 == 0): + sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j) + else: + sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, -j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s%d values " %(ctbPrefix,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + if startTs == 0: + t = time.time() + startTs = int(round(t * 1000)) + + ctbDict = {} + for i in range(ctbNum): + ctbDict[i] = 0 + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + rowsOfCtb = 0 + while rowsOfCtb < rowsPerTbl: + for i in range(ctbNum): + sql += " %s.%s_%d values "%(dbName,ctbPrefix,i) + for k in range(batchNum): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + ctbDict[i], ctbDict[i], ctbDict[i]) + ctbDict[i] += 1 + if (0 == ctbDict[i]%batchNum) or (ctbDict[i] == rowsPerTbl): + tsql.execute(sql) + sql = "insert into " + break + rowsOfCtb = ctbDict[0] + + tdLog.debug("insert data ............ [OK]") + return + + def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0): + tdLog.debug("start to insert data wiht auto create child table ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + if startTs == 0: + t = time.time() + startTs = int(round(t * 1000)) + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + rowsOfSql = 0 + for i in range(ctbNum): + sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + rowsOfSql += 1 + if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + rowsOfSql = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i,dbName,stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def syncCreateDbStbCtbInsertData(self, tsql, paraDict): + tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"]) + tdCom.create_stable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) + tdCom.create_ctable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) + if "event" in paraDict and type(paraDict['event']) == type(threading.Event()): + paraDict["event"].set() + + ctbPrefix = paraDict['ctbPrefix'] + ctbNum = paraDict["ctbNum"] + for i in range(ctbNum): + tbName = '%s%s'%(ctbPrefix,i) + tdCom.insert_rows(tsql,dbname=paraDict["dbName"],tbname=tbName,start_ts_value=paraDict['startTs'],count=paraDict['rowsPerTbl']) + return + + def threadFunction(self, **paraDict): + # create new connector for new tdSql instance in my thread + newTdSql = tdCom.newTdSql() + self.syncCreateDbStbCtbInsertData(self, newTdSql, paraDict) + return + + def asyncCreateDbStbCtbInsertData(self, paraDict): + pThread = threading.Thread(target=self.threadFunction, kwargs=paraDict) + pThread.start() + return pThread + + + def close(self): + self.cursor.close() + +clusterComCreate = ClusterComCreate() diff --git a/tests/system-test/7-tmq/tmqShow.py b/tests/system-test/7-tmq/tmqShow.py new file mode 100644 index 0000000000000000000000000000000000000000..6b7e7375ffeca382bb77e4e8b8ab1704a83854f3 --- /dev/null +++ b/tests/system-test/7-tmq/tmqShow.py @@ -0,0 +1,158 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db1', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 10, + 'rowsPerTbl': 4000, + 'batchNum': 15, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 20, + 'showMsg': 1, + 'showRow': 1} + + topicNameList = ['topic1', 'topic2', 'topic3', 'topic4'] + consumeGroupIdList = ['cgrp1', 'cgrp1', 'cgrp3', 'cgrp4'] + consumerIdList = [0, 1, 2, 3] + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict['vgroups'],replica=1) + tdLog.info("create stb") + tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) + tdLog.info("create ctb") + tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) + # tdLog.info("insert data") + # tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) + + tdLog.info("create 4 topics") + sqlString = "create topic %s as database %s" %(topicNameList[0], paraDict['dbName']) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + sqlString = "create topic %s as stable %s.%s" %(topicNameList[1], paraDict['dbName'], paraDict['stbName']) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + queryString = "select * from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[2], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s " %(topicNameList[3], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + tdSql.query("show topics") + tdLog.debug(tdSql.queryResult) + rows = tdSql.getRows() + if rows != len(consumerIdList): + tdLog.exit("topic rows error") + + for i in range (rows): + topicName = tdSql.getData(i,0) + matchFlag = 0 + while matchFlag == 0: + for j in range(len(topicNameList)): + if topicName == topicNameList[j]: + matchFlag = 1 + break + if matchFlag == 0: + tdLog.exit("topic name: %s is error", topicName) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicNameList[0] + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[0] + tmqCom.insertConsumerInfo(consumerIdList[0], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + topicList = topicNameList[1] + keyList = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[1] + tmqCom.insertConsumerInfo(consumerIdList[1], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + topicList = topicNameList[2] + keyList = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[2] + tmqCom.insertConsumerInfo(consumerIdList[2], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + topicList = topicNameList[3] + keyList = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[3] + tmqCom.insertConsumerInfo(consumerIdList[3], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) + + tdLog.info("async insert data") + pThread = tmqCom.asyncInsertData(paraDict) + + time.sleep(5) + tdLog.info("check show consumers") + tdSql.query("show consumers") + # tdLog.info(tdSql.queryResult) + rows = tdSql.getRows() + tdLog.info("show consumers rows: %d"%rows) + if rows != len(topicNameList): + tdLog.exit("show consumers rows error") + + tdLog.info("check show subscriptions") + tdSql.query("show subscriptions") + # tdLog.debug(tdSql.queryResult) + rows = tdSql.getRows() + tdLog.info("show subscriptions rows: %d"%rows) + if rows != paraDict['vgroups'] * len(topicNameList): + tdLog.exit("show subscriptions rows error") + + pThread.join() + + tdLog.info("insert process end, and start to check consume result") + expectRows = len(consumerIdList) + _ = tmqCom.selectConsumeResult(expectRows) + + time.sleep(10) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + tdSql.prepare() + self.tmqCase1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index ef217b828fcc0c9271a56b169801926307d59cac..571bb166e1d6ed7a77f956d4a83f1206b75801c1 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -23,6 +23,7 @@ python3 ./test.py -f 1-insert/alter_stable.py python3 ./test.py -f 1-insert/alter_table.py python3 ./test.py -f 1-insert/insertWithMoreVgroup.py python3 ./test.py -f 1-insert/table_comment.py +python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/varchar.py @@ -116,6 +117,10 @@ python3 ./test.py -f 2-query/function_null.py python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopCreateDb.py -N 5 -M 3 +# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 + # python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py @@ -139,3 +144,4 @@ python3 ./test.py -f 7-tmq/tmqCheckData.py python3 ./test.py -f 7-tmq/tmqUdf.py #python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5 python3 ./test.py -f 7-tmq/tmqConsumerGroup.py +python3 ./test.py -f 7-tmq/tmqShow.py diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 35f8ea953cf2f7e14a9bb948202b43243664f35c..76b83da34813fed3969aa07e50ed3c05aec163b6 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -22,9 +22,6 @@ import json import platform import socket import threading -from distutils.log import warn as printf -from tkinter import N -from fabric2 import Connection sys.path.append("../pytest") from util.log import * from util.dnodes import * diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index c2fba52e6eb301fa5b528e37950084f164501efe..81fa72d15adfb8bfb6c7fc96acc3c1c5a6dcbc40 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -635,8 +635,9 @@ void loop_consume(SThreadInfo* pInfo) { } } + int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); while (running) { - TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); + TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); if (tmqMsg) { if (0 != g_stConfInfo.showMsgFlag) { totalRows += msg_process(tmqMsg, pInfo, totalMsgs);