未验证 提交 02d5d5bf 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #14253 from taosdata/test/chr/TD-14699

test: modify testcase of muti-mnodes
...@@ -24,7 +24,7 @@ class ClusterDnodes(TDDnodes): ...@@ -24,7 +24,7 @@ class ClusterDnodes(TDDnodes):
class ConfigureyCluster: class ConfigureyCluster:
"""This will create defined number of dnodes and create a cluset. """This will create defined number of dnodes and create a cluster.
at the same time, it will return TDDnodes list: dnodes, """ at the same time, it will return TDDnodes list: dnodes, """
hostname= socket.gethostname() hostname= socket.gethostname()
...@@ -85,8 +85,8 @@ class ConfigureyCluster: ...@@ -85,8 +85,8 @@ class ConfigureyCluster:
count+=1 count+=1
time.sleep(1) time.sleep(1)
else: else:
tdLog.debug("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodeNums) tdLog.exit("create cluster with %d dnode but check dnode not ready within 5s ! "%self.dnodeNums)
return -1
cluster = ConfigureyCluster() cluster = ConfigureyCluster()
\ No newline at end of file
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
import time
import socket
import subprocess
from multiprocessing import Process
import threading
import time
import inspect
import ctypes
class TDTestCase:
def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
tdSql.init(conn.cursor())
self.host = socket.gethostname()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def _async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stopThread(self,thread):
self._async_raise(thread.ident, SystemExit)
def insertData(self,countstart,countstop):
# fisrt add data : db\stable\childtable\general table
for couti in range(countstart,countstop):
tdLog.debug("drop database if exists db%d" %couti)
tdSql.execute("drop database if exists db%d" %couti)
print("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("use db%d" %couti)
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
dbNumbers = int(dnodenumbers * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2")
clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# add some error operations and
tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;")
print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
tdLog.info("Take turns stopping all dnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
tdDnodes=cluster.dnodes
stopcount =0
while stopcount < restartNumber:
for i in range(dnodenumbers):
# threads=[]
# threads = MyThreadFunc(self.insert_data(i*2,i*2+2))
paraDict["dbName"]= 'db%d%d'%(stopcount,i)
threads=threading.Thread(target=clusterComCreate.create_database, args=(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))
threads.start()
tdDnodes[i].stoptaosd()
# sleep(10)
tdDnodes[i].starttaosd()
# sleep(10)
if clusterComCheck.checkDnodes(dnodenumbers):
# threads.join()
tdLog.info("first restart loop")
else:
print("456")
threads.join()
self.stopThread(threads)
tdLog.exit("one or more of dnodes failed to start ")
# self.check3mnode()
stopcount+=1
threads.join()
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkDbRows(dbNumbers)
for i in range(restartNumber):
clusterComCheck.checkDb(dnodenumbers,'db%d'%i)
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
import time
import socket
import subprocess
from multiprocessing import Process
import threading
import time
import inspect
import ctypes
class TDTestCase:
def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
tdSql.init(conn.cursor())
self.host = socket.gethostname()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def _async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stopThread(self,thread):
self._async_raise(thread.ident, SystemExit)
def insertData(self,countstart,countstop):
# fisrt add data : db\stable\childtable\general table
for couti in range(countstart,countstop):
tdLog.debug("drop database if exists db%d" %couti)
tdSql.execute("drop database if exists db%d" %couti)
print("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("use db%d" %couti)
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
dbNumbers = int(mnodeNums * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2")
clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# add some error operations and
tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;")
print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
tdLog.info("Take turns stopping Mnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
tdDnodes=cluster.dnodes
stopcount =0
while stopcount < restartNumber:
tdLog.info("first restart loop")
for i in range(mnodeNums):
# threads=[]
# threads = MyThreadFunc(self.insert_data(i*2,i*2+2))
paraDict["dbName"]= 'db%d%d'%(stopcount,i)
threads=threading.Thread(target=clusterComCreate.create_database, args=(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))
threads.start()
tdDnodes[i].stoptaosd()
# sleep(10)
tdDnodes[i].starttaosd()
# sleep(10)
if clusterComCheck.checkDnodes(dnodenumbers):
# threads.join()
tdLog.info("123")
else:
print("456")
threads.join()
self.stopThread(threads)
tdLog.exit("one or more of dnodes failed to start ")
# self.check3mnode()
stopcount+=1
threads.join()
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkDbRows(dbNumbers)
for i in range(restartNumber):
clusterComCheck.checkDb(mnodeNums,'db%d'%i)
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
import time
import socket
import subprocess
from multiprocessing import Process
import threading
import time
import inspect
import ctypes
class TDTestCase:
def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
tdSql.init(conn.cursor())
self.host = socket.gethostname()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def _async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stopThread(self,thread):
self._async_raise(thread.ident, SystemExit)
def insertData(self,countstart,countstop):
# fisrt add data : db\stable\childtable\general table
for couti in range(countstart,countstop):
tdLog.debug("drop database if exists db%d" %couti)
tdSql.execute("drop database if exists db%d" %couti)
print("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("use db%d" %couti)
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
vnodeNumbers = int(dnodenumbers-mnodeNums)
dbNumbers = int(vnodeNumbers * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2")
clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# add some error operations and
tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;")
print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
tdLog.info("Take turns stopping Vnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
tdDnodes=cluster.dnodes
stopcount =0
while stopcount < restartNumber:
for i in range(vnodeNumbers):
# threads=[]
# threads = MyThreadFunc(self.insert_data(i*2,i*2+2))
paraDict["dbName"]= 'db%d%d'%(stopcount,i)
threads=threading.Thread(target=clusterComCreate.create_database, args=(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))
threads.start()
tdDnodes[mnodeNums+i].stoptaosd()
# sleep(10)
tdDnodes[mnodeNums+i].starttaosd()
# sleep(10)
if clusterComCheck.checkDnodes(vnodeNumbers):
# threads.join()
tdLog.info("first restart loop")
else:
print("456")
threads.join()
self.stopThread(threads)
tdLog.exit("one or more of dnodes failed to start ")
# self.check3mnode()
stopcount+=1
threads.join()
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkDbRows(dbNumbers)
for i in range(restartNumber):
clusterComCheck.checkDb(vnodeNumbers,'db%d'%i)
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
\ No newline at end of file
...@@ -9,6 +9,11 @@ from util.sql import * ...@@ -9,6 +9,11 @@ from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import TDDnodes from util.dnodes import TDDnodes
from util.dnodes import TDDnode from util.dnodes import TDDnode
from util.cluster import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
import time import time
import socket import socket
import subprocess import subprocess
...@@ -17,26 +22,13 @@ import threading ...@@ -17,26 +22,13 @@ import threading
import time import time
import inspect import inspect
import ctypes import ctypes
class MyDnodes(TDDnodes):
def __init__(self ,dnodes_lists):
super(MyDnodes,self).__init__()
self.dnodes = dnodes_lists # dnode must be TDDnode instance
self.simDeployed = False
class TDTestCase: class TDTestCase:
def init(self,conn ,logSql): def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None # tdSql.init(conn.cursor())
# self.host = socket.gethostname()
def buildcluster(self,dnodenumber):
self.depoly_cluster(dnodenumber)
self.master_dnode = self.TDDnodes.dnodes[0]
self.host=self.master_dnode.cfgDict["fqdn"]
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
tdSql.init(conn1.cursor())
def getBuildPath(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
...@@ -106,52 +98,6 @@ class TDTestCase: ...@@ -106,52 +98,6 @@ class TDTestCase:
tdSql.checkData(0,0,rowsPerSTable) tdSql.checkData(0,0,rowsPerSTable)
return return
def depoly_cluster(self ,dnodes_nums=5,independent=True):
testCluster = False
valgrind = 0
hostname = socket.gethostname()
dnodes = []
start_port = 6030
start_port_sec = 6130
for num in range(1, dnodes_nums+1):
dnode = TDDnode(num)
dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}")
dnode.addExtraCfg("fqdn", f"{hostname}")
dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}")
dnode.addExtraCfg("monitorFqdn", hostname)
dnode.addExtraCfg("monitorPort", 7043)
dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}")
# configure three dnoe don't support vnodes
if independent and (num < 4):
dnode.addExtraCfg("supportVnodes", 0)
dnodes.append(dnode)
self.TDDnodes = MyDnodes(dnodes)
self.TDDnodes.init("")
self.TDDnodes.setTestCluster(testCluster)
self.TDDnodes.setValgrind(valgrind)
self.TDDnodes.stopAll()
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.deploy(dnode.index,{})
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.starttaosd(dnode.index)
# create cluster
for dnode in self.TDDnodes.dnodes[1:]:
# print(dnode.cfgDict)
dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0]
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;"
print(cmd)
os.system(cmd)
time.sleep(2)
tdLog.info(" create cluster with %d dnode done! " %dnodes_nums)
def checkdnodes(self,dnodenumber): def checkdnodes(self,dnodenumber):
count=0 count=0
while count < 100: while count < 100:
...@@ -305,6 +251,14 @@ class TDTestCase: ...@@ -305,6 +251,14 @@ class TDTestCase:
tdSql.checkData(2,2,'offline') tdSql.checkData(2,2,'offline')
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
def check5dnode(self):
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
tdSql.checkData(0,4,'ready')
tdSql.checkData(4,4,'ready')
def five_dnode_three_mnode(self,dnodenumber): def five_dnode_three_mnode(self,dnodenumber):
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,1,'%s:6030'%self.host)
...@@ -346,6 +300,7 @@ class TDTestCase: ...@@ -346,6 +300,7 @@ class TDTestCase:
threads.join() threads.join()
else: else:
print("456") print("456")
threads.join()
self.stop_thread(threads) self.stop_thread(threads)
assert 1 == 2 ,"some dnode started failed" assert 1 == 2 ,"some dnode started failed"
return False return False
...@@ -357,16 +312,8 @@ class TDTestCase: ...@@ -357,16 +312,8 @@ class TDTestCase:
self.check3mnode() self.check3mnode()
def getConnection(self, dnode):
host = dnode.cfgDict["fqdn"]
port = dnode.cfgDict["serverPort"]
config_dir = dnode.cfgDir
return taos.connect(host=host, port=int(port), config=config_dir)
def run(self): def run(self):
# print(self.master_dnode.cfgDict) # print(self.master_dnode.cfgDict)
self.buildcluster(5)
self.five_dnode_three_mnode(5) self.five_dnode_three_mnode(5)
def stop(self): def stop(self):
......
...@@ -12,7 +12,10 @@ from util.dnodes import TDDnodes ...@@ -12,7 +12,10 @@ from util.dnodes import TDDnodes
from util.dnodes import TDDnode from util.dnodes import TDDnode
from util.cluster import * from util.cluster import *
from test import tdDnodes from test import tdDnodes
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import *
import time import time
import socket import socket
import subprocess import subprocess
...@@ -216,60 +219,84 @@ class TDTestCase: ...@@ -216,60 +219,84 @@ class TDTestCase:
else: else:
tdLog.exit("create cluster with %d dnode but check dnode not ready within 5s ! "%dnodeNumbers) tdLog.exit("create cluster with %d dnode but check dnode not ready within 5s ! "%dnodeNumbers)
def five_dnode_three_mnode(self,dnodenumber): def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
self.check_dnodes_status(5) tdLog.printNoPrefix("======== test case 1: ")
tdSql.query("show mnodes;") paraDict = {'dbName': 'db',
tdLog.debug(self.host) 'dropFlag': 1,
tdSql.checkRows(1) 'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
dbNumbers = int(dnodenumbers * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(0,2,'leader') tdSql.checkData(4,1,'%s:6430'%self.host)
tdSql.checkData(0,3,'ready') clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes; # fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2") tdSql.execute("create mnode on dnode 2")
time.sleep(10) clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3") tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# fisrt check statut ready # add some error operations and
self.check3mnode() tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2") tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
# tdLog.debug(tdSql.queryResult) print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
tdLog.debug("stop and follower of mnode") # restart all taosd
tdDnodes=cluster.dnodes tdDnodes=cluster.dnodes
# tdLog.debug(tdDnodes[0])
tdDnodes[1].stoptaosd() tdDnodes[1].stoptaosd()
self.check3mnode2off() clusterComCheck.check3mnodeoff(2,3)
tdDnodes[1].starttaosd() tdDnodes[1].starttaosd()
self.check3mnode() clusterComCheck.checkMnodeStatus(3)
tdDnodes[2].stoptaosd() tdDnodes[2].stoptaosd()
self.check3mnode3off() clusterComCheck.check3mnodeoff(3,3)
tdDnodes[2].starttaosd() tdDnodes[2].starttaosd()
self.check3mnode() clusterComCheck.checkMnodeStatus(3)
tdDnodes[0].stoptaosd() tdDnodes[0].stoptaosd()
self.check3mnode1off() clusterComCheck.check3mnodeoff(1,3)
tdDnodes[0].starttaosd() tdDnodes[0].starttaosd()
self.check3mnode() clusterComCheck.checkMnodeStatus(3)
self.check3mnode() tdLog.info("Take turns stopping all dnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
stopcount =0 stopcount =0
while stopcount <= 2: while stopcount <= 2:
for i in range(dnodenumber): tdLog.info("first restart loop")
for i in range(dnodenumbers):
tdDnodes[i].stoptaosd() tdDnodes[i].stoptaosd()
tdDnodes[i].starttaosd() tdDnodes[i].starttaosd()
# self.check3mnode()
stopcount+=1 stopcount+=1
self.check3mnode() clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(3)
def run(self): def run(self):
self.five_dnode_three_mnode(5) # print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from collections import defaultdict
import random
import string
import threading
import requests
import time
# import socketfrom
import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# class actionType(Enum):
# CREATE_DATABASE = 0
# CREATE_STABLE = 1
# CREATE_CTABLE = 2
# INSERT_DATA = 3
class ClusterComCheck:
def init(self, conn, logSql):
tdSql.init(conn.cursor())
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkDnodes(self,dnodeNumbers):
count=0
while count < 5:
tdSql.query("show dnodes")
# tdLog.debug(tdSql.queryResult)
status=0
for i in range(dnodeNumbers):
if tdSql.queryResult[i][4] == "ready":
status+=1
tdLog.info(status)
if status == dnodeNumbers:
tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within 5s! " %dnodeNumbers)
return True
count+=1
time.sleep(1)
else:
tdLog.debug(tdSql.queryResult)
tdLog.exit("it find cluster with %d dnodes but check that there dnodes are not ready within 5s ! "%dnodeNumbers)
def checkDbRows(self,dbNumbers):
dbNumbers=int(dbNumbers)
count=0
while count < 5:
tdSql.query("show databases;")
if tdSql.checkRows(dbNumbers+2):
tdLog.success("we find %d databases and expect %d in clusters! " %(tdSql.queryRows,dbNumbers+2))
return True
else:
continue
else :
tdLog.debug(tdSql.queryResult)
tdLog.exit("we find %d databases but expect %d in clusters! " %(tdSql.queryRows,dbNumbers))
def checkDb(self,dbNumbers,dbindex):
count=0
while count < 5:
query_status=0
for i in range(dbNumbers):
for j in range(dbNumbers):
tdSql.query("show databases;")
if "%s%d"%(dbindex,j) == tdSql.queryResult[i+2][0] :
if tdSql.queryResult[i+2][19] == "ready":
query_status+=1
else:
continue
# print(query_status)
count+=1
if query_status == dbNumbers:
tdLog.success("we find cluster with %d dnode and check all databases are ready within 5s! " %dbNumbers)
return True
else:
tdLog.debug(tdSql.queryResult)
tdLog.exit("database is not ready within 5s")
def checkData(self,dbname,stbname,stableCount,CtableCount,rowsPerSTable,):
tdSql.execute("use %s"%dbname)
tdSql.query("show stables")
tdSql.checkRows(stableCount)
tdSql.query("show tables")
tdSql.checkRows(CtableCount)
for i in range(stableCount):
tdSql.query("select count(*) from %s%d"%(stbname,i))
tdSql.checkData(0,0,rowsPerSTable)
return
def checkMnodeStatus(self,mnodeNums):
self.mnodeNums=int(mnodeNums)
# self.leaderDnode=int(leaderDnode)
count=0
while count < 10:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(self.mnodeNums) :
tdLog.success("cluster has %d mnodes" %self.mnodeNums )
if self.mnodeNums == 1:
if tdSql.queryResult[0][2]== 'leader' and tdSql.queryResult[0][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
count+=1
elif self.mnodeNums == 3 :
if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
elif tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
elif tdSql.queryResult[2][2]=='leader' and tdSql.queryResult[2][3]== 'ready' :
if tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
count+=1
elif self.mnodeNums == 2 :
if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
elif tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums)
return True
count+=1
else:
tdLog.debug(tdSql.queryResult)
tdLog.exit("cluster of %d mnodes is not ready in 10s " %self.mnodeNums)
def check3mnodeoff(self,offlineDnodeNo,mnodeNums=3):
count=0
while count < 10:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(mnodeNums) :
tdLog.success("cluster has %d mnodes" %self.mnodeNums )
else:
tdLog.exit("mnode number is correct")
if offlineDnodeNo == 1:
if tdSql.queryResult[0][2]=='offline' :
if tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' :
tdLog.success("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
elif tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
if tdSql.queryResult[2][2]=='leader' and tdSql.queryResult[2][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
count+=1
elif offlineDnodeNo == 2:
if tdSql.queryResult[1][2]=='offline' :
if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[2][2]=='follower' and tdSql.queryResult[2][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
elif tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[2][2]=='leader' and tdSql.queryResult[2][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
count+=1
elif offlineDnodeNo == 3:
if tdSql.queryResult[2][2]=='offline' :
if tdSql.queryResult[0][2]=='leader' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='follower' and tdSql.queryResult[1][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
elif tdSql.queryResult[0][2]=='follower' and tdSql.queryResult[0][3]== 'ready' :
if tdSql.queryResult[1][2]=='leader' and tdSql.queryResult[1][3]== 'ready' :
tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo)
return True
count+=1
else:
tdLog.debug(tdSql.queryResult)
tdLog.exit("stop mnodes on dnode %d failed in 10s ")
def close(self):
self.cursor.close()
clusterComCheck = ClusterComCheck()
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from collections import defaultdict
import random
import string
import threading
import requests
import time
# import socketfrom
import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# class actionType(Enum):
# CREATE_DATABASE = 0
# CREATE_STABLE = 1
# CREATE_CTABLE = 2
# INSERT_DATA = 3
class ClusterComCreate:
def init(self, conn, logSql):
tdSql.init(conn.cursor())
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName)
def initConsumerInfoTable(self,cdbName='cdb'):
tdLog.info("drop consumeinfo table")
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql)
tdSql.query(sql)
def selectConsumeResult(self,expectRows,cdbName='cdb'):
resultList=[]
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == expectRows:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3))
return resultList
def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if (platform.system().lower() == 'windows'):
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
break
else:
time.sleep(0.1)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 :
print(tdSql.getData(0, 1), tdSql.getData(1, 1))
if tdSql.getData(1, 1) == 1:
break
time.sleep(0.1)
return
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, dbName,stbName):
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 int, c2 int, c3 binary(16)) tags(t1 int, t2 binary(32))"%(dbName, stbName))
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1):
tsql.execute("use %s" %dbName)
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
tagValue = 'beijing'
if (i % 2 == 0):
tagValue = 'shanghai'
sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i,stbName,i+1, tagValue)
if (i > 0) and (i%100 == 0):
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs is None:
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j)
else:
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, -j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs == 0:
t = time.time()
startTs = int(round(t * 1000))
ctbDict = {}
for i in range(ctbNum):
ctbDict[i] = 0
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfCtb = 0
while rowsOfCtb < rowsPerTbl:
for i in range(ctbNum):
sql += " %s.%s_%d values "%(dbName,ctbPrefix,i)
for k in range(batchNum):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + ctbDict[i], ctbDict[i], ctbDict[i])
ctbDict[i] += 1
if (0 == ctbDict[i]%batchNum) or (ctbDict[i] == rowsPerTbl):
tsql.execute(sql)
sql = "insert into "
break
rowsOfCtb = ctbDict[0]
tdLog.debug("insert data ............ [OK]")
return
def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data wiht auto create child table ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs == 0:
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
rowsOfSql += 1
if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsOfSql = 0
if j < rowsPerTbl - 1:
sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i,dbName,stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def syncCreateDbStbCtbInsertData(self, tsql, paraDict):
tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"])
tdCom.create_stable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdCom.create_ctable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
if "event" in paraDict and type(paraDict['event']) == type(threading.Event()):
paraDict["event"].set()
ctbPrefix = paraDict['ctbPrefix']
ctbNum = paraDict["ctbNum"]
for i in range(ctbNum):
tbName = '%s%s'%(ctbPrefix,i)
tdCom.insert_rows(tsql,dbname=paraDict["dbName"],tbname=tbName,start_ts_value=paraDict['startTs'],count=paraDict['rowsPerTbl'])
return
def threadFunction(self, **paraDict):
# create new connector for new tdSql instance in my thread
newTdSql = tdCom.newTdSql()
self.syncCreateDbStbCtbInsertData(self, newTdSql, paraDict)
return
def asyncCreateDbStbCtbInsertData(self, paraDict):
pThread = threading.Thread(target=self.threadFunction, kwargs=paraDict)
pThread.start()
return pThread
def close(self):
self.cursor.close()
clusterComCreate = ClusterComCreate()
...@@ -116,6 +116,10 @@ python3 ./test.py -f 2-query/function_null.py ...@@ -116,6 +116,10 @@ python3 ./test.py -f 2-query/function_null.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 # python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册