diff --git a/tests/system-test/0-others/splitVGroup.py b/tests/system-test/0-others/splitVGroup.py index 32001f34b09566acd5f1faa6d7358dc1ce1ac150..450996106608aa60cec781392bec8f9463ff2171 100644 --- a/tests/system-test/0-others/splitVGroup.py +++ b/tests/system-test/0-others/splitVGroup.py @@ -328,9 +328,28 @@ class TDTestCase: tdLog.exit("split vgroup transaction is not finished after executing 50s") return False + # split error + def expectSplitError(self, dbName): + vgids = self.getVGroup(dbName) + selid = random.choice(vgids) + sql = f"split vgroup {selid}" + tdLog.info(sql) + tdSql.error(sql) + + # expect split ok + def expectSplitOk(self, dbName): + # split vgroup + vgList1 = self.getVGroup(dbName) + self.splitVGroup(dbName) + vgList2 = self.getVGroup(dbName) + vgNum1 = len(vgList1) + 1 + vgNum2 = len(vgList2) + if vgNum1 != vgNum2: + tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}") + return + # split empty database - def splitEmptyDB(self): - + def splitEmptyDB(self): dbName = "emptydb" vgNum = 2 # create database @@ -339,17 +358,33 @@ class TDTestCase: tdSql.execute(sql) # split vgroup - self.splitVGroup(dbName) - vgList = self.getVGroup(dbName) - vgNum1 = len(vgList) - vgNum2 = vgNum + 1 - if vgNum1 != vgNum2: - tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}") - return + self.expectSplitOk(dbName) + + + # forbid + def checkForbid(self): + # stream + tdLog.info("check forbid split having stream...") + tdSql.execute("create database streamdb;") + tdSql.execute("use streamdb;") + tdSql.execute("create table ta(ts timestamp, age int);") + tdSql.execute("create stream ma into sta as select count(*) from ta interval(1s);") + self.expectSplitError("streamdb") + tdSql.execute("drop stream ma;") + self.expectSplitOk("streamdb") + + # topic + tdLog.info("check forbid split having topic...") + tdSql.execute("create database topicdb wal_retention_period 10;") + tdSql.execute("use topicdb;") + tdSql.execute("create table ta(ts timestamp, age int);") + tdSql.execute("create topic toa as select * from ta;") + self.expectSplitError("topicdb") + tdSql.execute("drop topic toa;") + self.expectSplitOk("topicdb") # run def run(self): - # prepare env self.prepareEnv() @@ -360,12 +395,13 @@ class TDTestCase: # check two db query result same self.checkResult() - tdLog.info(f"split vgroup i={i} passed.") # split empty db - self.splitEmptyDB() + self.splitEmptyDB() + # check topic and stream forib + self.checkForbid() # stop def stop(self): diff --git a/tests/system-test/eco-system/util/Consumer.py b/tests/system-test/eco-system/util/Consumer.py new file mode 100644 index 0000000000000000000000000000000000000000..b483253a9582efe7202ff90de22c855f87fcee3b --- /dev/null +++ b/tests/system-test/eco-system/util/Consumer.py @@ -0,0 +1,82 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# The option for wal_retetion_period and wal_retention_size is work well +# + +import taos +from taos.tmq import Consumer + +import os +import sys +import threading +import json +import time +from datetime import date +from datetime import datetime +from datetime import timedelta +from os import path + + +# consume topic +def consume_topic(topic_name, consume_cnt, wait): + print("start consume...") + consumer = Consumer( + { + "group.id": "tg2", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + } + ) + print("start subscrite...") + consumer.subscribe([topic_name]) + + cnt = 0 + try: + while True and cnt < consume_cnt: + res = consumer.poll(1) + if not res: + if wait: + continue + else: + break + err = res.error() + if err is not None: + raise err + val = res.value() + cnt += 1 + print(f" consume {cnt} ") + for block in val: + print(block.fetchall()) + finally: + consumer.unsubscribe() + consumer.close() + + +if __name__ == "__main__": + print(sys.argv) + if len(sys.argv) < 2: + + print(" please input topic name for consume . -c for wait") + else: + wait = False + if "-c" == sys.argv[1]: + wait = True + topic = sys.argv[2] + else: + topic = sys.argv[1] + + print(f' wait={wait} topic={topic}') + consume_topic(topic, 10000000, wait) \ No newline at end of file diff --git a/tests/system-test/eco-system/util/restartDnodes.py b/tests/system-test/eco-system/util/restartDnodes.py new file mode 100644 index 0000000000000000000000000000000000000000..feee260fdf49957577a2204831de54e8183b222c --- /dev/null +++ b/tests/system-test/eco-system/util/restartDnodes.py @@ -0,0 +1,84 @@ +import time +import os +import subprocess +import random +import platform + +class dnode(): + def __init__(self, pid, path): + self.pid = pid + self.path = path + +# run exePath no wait finished +def runNoWait(exePath): + if platform.system().lower() == 'windows': + cmd = f"mintty -h never {exePath}" + else: + cmd = f"nohup {exePath} > /dev/null 2>&1 & " + + if os.system(cmd) != 0: + return False + else: + return True + +# get online dnodes +def getDnodes(): + cmd = "ps aux | grep taosd | awk '{{print $2,$11,$12,$13}}'" + result = os.system(cmd) + result=subprocess.check_output(cmd,shell=True) + strout = result.decode('utf-8').split("\n") + dnodes = [] + + for line in strout: + cols = line.split(' ') + if len(cols) != 4: + continue + exepath = cols[1] + if len(exepath) < 5 : + continue + if exepath[-5:] != 'taosd': + continue + + # add to list + path = cols[1] + " " + cols[2] + " " + cols[3] + dnodes.append(dnode(cols[0], path)) + + print(" show dnodes cnt=%d...\n"%(len(dnodes))) + for dn in dnodes: + print(f" pid={dn.pid} path={dn.path}") + + return dnodes + +def restartDnodes(dnodes, cnt, seconds): + print(f"start dnode cnt={cnt} wait={seconds}s") + selects = random.sample(dnodes, cnt) + for select in selects: + print(f" kill -9 {select.pid}") + cmd = f"kill -9 {select.pid}" + os.system(cmd) + print(f" restart {select.path}") + if runNoWait(select.path) == False: + print(f"run {select.path} failed.") + raise Exception("exe failed.") + print(f" sleep {seconds}s ...") + time.sleep(seconds) + +def run(): + # kill seconds interval + killLoop = 10 + minKill = 1 + maxKill = 10 + for i in range(killLoop): + dnodes = getDnodes() + killCnt = 0 + if len(dnodes) > 0: + killCnt = random.randint(1, len(dnodes)) + restartDnodes(dnodes, killCnt, random.randint(1, 5)) + + seconds = random.randint(minKill, maxKill) + print(f"----------- kill loop i={i} killCnt={killCnt} done. do sleep {seconds}s ... \n") + time.sleep(seconds) + + +if __name__ == '__main__': + run() \ No newline at end of file