import taos import sys import time import socket import os import threading from enum import Enum from util.log import * from util.sql import * from util.cases import * from util.dnodes import * sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: def __init__(self): self.snapshot = 0 self.vgroups = 4 self.ctbNum = 100 self.rowsPerTbl = 10 def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) def waitSubscriptionExit(self, max_wait_count=20): wait_cnt = 0 while (wait_cnt < max_wait_count): tdSql.query("show subscriptions") if tdSql.getRows() == 0: break else: time.sleep(1) wait_cnt += 1 tdLog.info("wait subscriptions exit for %d s"%wait_cnt) # drop some ntbs def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', 'vgroups': 4, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ntb', 'ctbStartIdx': 0, 'ctbNum': 100, 'rowsPerTbl': 1000, 'batchNum': 1000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'endTs': 0, 'pollDelay': 5, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() tdLog.info("start create database....") tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("start create normal tables....") tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) tdLog.info("start insert data into normal tables....") tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) tdLog.info("create topics from database") topicFromDb = 'topic_dbt' tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) if self.snapshot == 0: consumerId = 0 elif self.snapshot == 1: consumerId = 1 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) topicList = topicFromDb ifcheckdata = 1 ifManualCommit = 1 keyList = 'group.id:cgrp1,\ enable.auto.commit:true,\ auto.commit.interval.ms:1000,\ auto.offset.reset:earliest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("drop some ntables") # drop 1/4 ctbls from half offset paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") self.waitSubscriptionExit() tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) tdLog.printNoPrefix("======== test case 1 end ...... ") # drop some ntbs and create some new ntbs def tmqCase2(self): tdLog.printNoPrefix("======== test case 2: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, 'event': '', 'vgroups': 4, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ntb', 'ctbStartIdx': 0, 'ctbNum': 100, 'rowsPerTbl': 1000, 'batchNum': 1000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'endTs': 0, 'pollDelay': 10, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() tdLog.info("start create database....") tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("start create normal tables....") tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) tdLog.info("start insert data into normal tables....") tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) tdLog.info("create topics from database") topicFromDb = 'topic_dbt' tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) if self.snapshot == 0: consumerId = 2 elif self.snapshot == 1: consumerId = 3 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) topicList = topicFromDb ifcheckdata = 1 ifManualCommit = 1 keyList = 'group.id:cgrp1,\ enable.auto.commit:true,\ auto.commit.interval.ms:1000,\ auto.offset.reset:earliest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("drop some ntables") # drop 1/4 ctbls from half offset paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2) paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4) tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"]) tdLog.info("start create some new normal tables....") paraDict["ctbPrefix"] = 'newCtb' paraDict["ctbNum"] = self.ctbNum tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"]) tdLog.info("start insert data into these new normal tables....") tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"]) tdLog.info("start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) totalConsumeRows = 0 for i in range(expectRows): totalConsumeRows += resultList[i] tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") self.waitSubscriptionExit() tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) tdLog.printNoPrefix("======== test case 2 end ...... ") def run(self): tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.snapshot = 0 # self.tmqCase1() self.tmqCase2() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 # self.tmqCase1() self.tmqCase2() def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") event = threading.Event() tdCases.addLinux(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())