diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index ce06e0eac42ff2186a880b330f5fc953e811f8e1..30c0008f3d8b227327bff4e7bcf1bd0bd4f147b6 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1351,7 +1351,7 @@ static int32_t smlInsertData(SSmlHandle *info) { } taosArrayPush(info->pRequest->tableList, &pName); - tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1); + strcpy(pName.tname, tableData->childTableName); SRequestConnInfo conn = {0}; conn.pTrans = info->taos->pAppInfo->pTransporter; diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index f0e020edfed8136e6b7fbfc3a023c1ef2822fd01..989bff39849f1157a65b0ea606ea1763c337daf5 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -87,18 +87,6 @@ static void dmStopDnode(int signum, void *sigInfo, void *context) { } void dmLogCrash(int signum, void *sigInfo, void *context) { - taosIgnSignal(SIGTERM); - taosIgnSignal(SIGHUP); - taosIgnSignal(SIGINT); - taosIgnSignal(SIGBREAK); - -#ifndef WINDOWS - taosIgnSignal(SIGBUS); -#endif - taosIgnSignal(SIGABRT); - taosIgnSignal(SIGFPE); - taosIgnSignal(SIGSEGV); - char *pMsg = NULL; const char *flags = "UTL FATAL "; ELogLevel level = DEBUG_FATAL; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 50e502f4ab9b240e8518bdb4fe531929149e974d..ff6a2f460a1e84cd2d647c17102dfa45e5c5462e 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -256,10 +256,13 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db); pDb = mndAcquireDb(pMnode, db); if (pDb == NULL) { - terrno = TSDB_CODE_MND_INVALID_DB; - mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, - terrstr()); - goto _OVER; + if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) && + (0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) { + terrno = TSDB_CODE_MND_INVALID_DB; + mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, + terrstr()); + goto _OVER; + } } if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index cda7e35b0fa1c89b6d5b422193562d3ab777602c..a12f8051ba982ed627ed0767b76d344678748ca9 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -295,6 +295,36 @@ void walAlignVersions(SWal* pWal) { wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer); } +int walRepairLogFileTs(SWal* pWal, bool* updateMeta) { + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + int32_t fileIdx = -1; + int32_t lastCloseTs = 0; + char fnameStr[WAL_FILE_LEN] = {0}; + + while (++fileIdx < sz - 1) { + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + if (pFileInfo->closeTs != -1) { + lastCloseTs = pFileInfo->closeTs; + continue; + } + + walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); + int32_t mtime = 0; + if (taosStatFile(fnameStr, NULL, &mtime) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); + return -1; + } + + if (updateMeta != NULL) *updateMeta = true; + if (pFileInfo->createTs == -1) pFileInfo->createTs = lastCloseTs; + pFileInfo->closeTs = mtime; + lastCloseTs = pFileInfo->closeTs; + } + + return 0; +} + bool walLogEntriesComplete(const SWal* pWal) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); bool complete = true; @@ -433,15 +463,8 @@ int walCheckAndRepairMeta(SWal* pWal) { wError("failed to scan wal last ver since %s", terrstr()); return -1; } - // remove the empty wal log, and its idx - wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); - taosRemoveFile(fnameStr); - walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); - wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); - taosRemoveFile(fnameStr); - // remove its meta entry - taosArrayRemove(pWal->fileInfoSet, fileIdx); - continue; + // empty log file + lastVer = pFileInfo->firstVer - 1; } // update lastVer @@ -460,6 +483,11 @@ int walCheckAndRepairMeta(SWal* pWal) { } (void)walAlignVersions(pWal); + // repair ts of files + if (walRepairLogFileTs(pWal, &updateMeta) < 0) { + return -1; + } + // update meta file if (updateMeta) { (void)walSaveMeta(pWal); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 848de4f36da4db57e881df4977b4f2399314a68f..9b7b3dfd5006857eb57b5e74edd39a3835fdbd71 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -284,15 +284,15 @@ int32_t walEndSnapshot(SWal *pWal) { if (ver == -1) { code = -1; goto END; - }; + } pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); - ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1); + // compatible mode for refVer bool hasTopic = false; - int64_t refVer = ver; + int64_t refVer = INT64_MAX; void *pIter = NULL; while (1) { pIter = taosHashIterate(pWal->pRefHash, pIter); @@ -300,54 +300,40 @@ int32_t walEndSnapshot(SWal *pWal) { SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer == -1) continue; refVer = TMIN(refVer, pRef->refVer - 1); - wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); hasTopic = true; } - // compatible mode if (pWal->cfg.retentionPeriod == 0 && hasTopic) { + wInfo("vgId:%d, wal found refVer:%" PRId64 " in compatible mode, ver:%" PRId64, pWal->cfg.vgId, refVer, ver); ver = TMIN(ver, refVer); } + // find files safe to delete int deleteCnt = 0; int64_t newTotSize = pWal->totSize; - SWalFileInfo tmp; + SWalFileInfo tmp = {0}; tmp.firstVer = ver; - // find files safe to delete SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); + if (pInfo) { - SWalFileInfo *pLastFileInfo = taosArrayGetLast(pWal->fileInfoSet); - wDebug("vgId:%d, wal search found file info: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - if (ver >= pInfo->lastVer) { + wDebug("vgId:%d, wal search found file info. ver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, ver, + pInfo->firstVer, pInfo->lastVer); + ASSERT(ver <= pInfo->lastVer); + if (ver == pInfo->lastVer) { pInfo++; - wDebug("vgId:%d, wal remove advance one file: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - } - if (pInfo <= pLastFileInfo) { - wDebug("vgId:%d, wal end remove for first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - } else { - wDebug("vgId:%d, wal no remove", pWal->cfg.vgId); } // iterate files, until the searched result + // delete according to file size or close time for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64 - "), new tot size %" PRId64, - pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize); - if ((pWal->cfg.retentionSize != -1 && pWal->cfg.retentionSize != 0 && newTotSize > pWal->cfg.retentionSize) || - ((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 && - iter->closeTs + pWal->cfg.retentionPeriod < ts))) { - // delete according to file size or close time - wDebug("vgId:%d, check pass", pWal->cfg.vgId); + if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) || + (pWal->cfg.retentionPeriod == 0 || + pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) { deleteCnt++; newTotSize -= iter->fileSize; taosArrayPush(pWal->toDeleteFiles, iter); } - wDebug("vgId:%d, check not pass", pWal->cfg.vgId); } - UPDATE_META: // make new array, remove files taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); if (taosArrayGetSize(pWal->fileInfoSet) == 0) { @@ -357,11 +343,12 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; } } + + // update meta pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->totSize = newTotSize; pWal->vers.verInSnapshotting = -1; - // save snapshot ver, commit ver code = walSaveMeta(pWal); if (code < 0) { goto END; @@ -369,23 +356,27 @@ int32_t walEndSnapshot(SWal *pWal) { // delete files deleteCnt = taosArrayGetSize(pWal->toDeleteFiles); - wDebug("vgId:%d, wal should delete %d files", pWal->cfg.vgId, deleteCnt); - char fnameStr[WAL_FILE_LEN]; + char fnameStr[WAL_FILE_LEN] = {0}; + pInfo = NULL; + for (int i = 0; i < deleteCnt; i++) { pInfo = taosArrayGet(pWal->toDeleteFiles, i); + walBuildLogName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) { wError("vgId:%d, failed to remove log file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno)); goto END; } walBuildIdxName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) { wError("vgId:%d, failed to remove idx file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno)); goto END; } } + if (pInfo != NULL) { + wInfo("vgId:%d, wal log files recycled. count:%d, until ver:%" PRId64 ", closeTs:%" PRId64, pWal->cfg.vgId, + deleteCnt, pInfo->lastVer, pInfo->closeTs); + } taosArrayClear(pWal->toDeleteFiles); END: diff --git a/test1 b/test1 new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/pytest/util/cluster.py b/tests/pytest/util/cluster.py index 2607cf63c29488b05a998fafd1565f29917d8e5e..a6e3530dc99edc631cfabc0608089a26ca61673e 100644 --- a/tests/pytest/util/cluster.py +++ b/tests/pytest/util/cluster.py @@ -52,8 +52,9 @@ class ConfigureyCluster: dnode.addExtraCfg("secondEp", f"{hostname}:{startPort_sec}") # configure dnoe of independent mnodes - if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == True : - dnode.addExtraCfg("supportVnodes", 1024) + if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == "True" : + tdLog.info("set mnode supportVnodes 0") + dnode.addExtraCfg("supportVnodes", 0) # print(dnode) self.dnodes.append(dnode) return self.dnodes @@ -71,6 +72,7 @@ class ConfigureyCluster: tdSql.init(conn.cursor()) mnodeNums=int(mnodeNums) for i in range(2,mnodeNums+1): + tdLog.info("create mnode on dnode %d"%i) tdSql.execute(" create mnode on dnode %d;"%i) diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 149c6d8ded3b1ce81dd80d7f4208b43c74acd78f..f5926321da38fa52af2f974543a881a4fa46894f 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -207,7 +207,7 @@ class ClusterComCheck: count+=1 else: tdLog.debug(tdSql.queryResult) - tdLog.exit("stop mnodes on dnode %d failed in 10s ") + tdLog.exit(f"stop mnodes on dnode {offlineDnodeNo} failed in 10s ") def check3mnode2off(self,mnodeNums=3): count=0 @@ -226,7 +226,45 @@ class ClusterComCheck: count+=1 else: tdLog.debug(tdSql.queryResult) - tdLog.exit("stop mnodes on dnode %d failed in 10s ") + tdLog.exit("stop mnodes on dnode 2 or 3 failed in 10s") + + def check_vgroups_status(self,vgroup_numbers=2,db_replica=3,count_number=10,db_name="db"): + """ check vgroups status in 10s after db vgroups status is changed """ + vgroup_numbers = int(vgroup_numbers) + self.db_replica = int(db_replica) + tdLog.debug("start to check status of vgroups") + count=0 + last_number=vgroup_numbers-1 + while count < count_number: + time.sleep(1) + tdSql.query(f"show {db_name}.vgroups;") + if count == 0 : + if tdSql.checkRows(vgroup_numbers) : + tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" ) + else: + tdLog.exit(f"vgroup number of {db_name} is not correct") + if self.db_replica == 1 : + if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[1][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader': + ready_time= (count + 1) + tdLog.success(f"all vgroups of {db_name} are leaders in {count + 1} s") + return True + count+=1 + elif self.db_replica == 3 : + vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]] + + vgroup_status_last=[tdSql.queryResult[last_number][4],tdSql.queryResult[last_number][6],tdSql.queryResult[last_number][8]] + if vgroup_status_first.count('leader') == 1 and vgroup_status_first.count('follower') == 2: + if vgroup_status_last.count('leader') == 1 and vgroup_status_last.count('follower') == 2: + ready_time= (count + 1) + tdLog.success(f"all vgroups of {db_name} are ready in {ready_time} s") + return True + count+=1 + else: + tdLog.debug(tdSql.queryResult) + tdLog.notice(f"all vgroups leader of {db_name} is selected {count}s ") + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno) + tdLog.exit("%s(%d) failed " % args) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py new file mode 100644 index 0000000000000000000000000000000000000000..7d46b3143d26ad99b4e02c92518a02af0d3be2db --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py @@ -0,0 +1,206 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 200, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 1000, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + for tr in threads: + tr.join() + + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + if i == 0 : + stableName= '%s_%d'%(paraDict['stbName'],0) + newTdSql=tdCom.newTdSql() + # newTdSql.execute('alter database db0_0 replica 3') + clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + tdDnodes[i].stoptaosd() + clusterComCheck.checkDbRows(dbNumbers) + # sleep(10) + tdDnodes[i].starttaosd() + if i == 3 : + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 3") + TdSqlEx.execute('alter database db0_0 replica 3') + + + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + if i == 0 : + tdSql.checkData(0,0,rowsPerStb*2) + else: + tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=150) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py new file mode 100644 index 0000000000000000000000000000000000000000..5b5fb04969468e6febe91feb8263c8045b1e64eb --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py @@ -0,0 +1,206 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 3, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 200, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 1000, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + for tr in threads: + tr.join() + + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + clusterComCheck.checkDbRows(dbNumbers) + if i == 0 : + stableName= '%s_%d'%(paraDict['stbName'],0) + newTdSql=tdCom.newTdSql() + # newTdSql.execute('alter database db0_0 replica 3') + clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + # sleep(10) + tdDnodes[i].starttaosd() + if i == 3 : + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 1") + TdSqlEx.execute('alter database db0_0 replica 1') + + + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + if i == 0 : + tdSql.checkData(0,0,rowsPerStb*2) + else: + tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=1,db_name=paraDict["dbName"],count_number=150) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py new file mode 100644 index 0000000000000000000000000000000000000000..aa3ed8e3fd4596f0f9f6afa98f43d10101265b96 --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py @@ -0,0 +1,222 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,countstart,countstop): + # fisrt add data : db\stable\childtable\general table + + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) + tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("use db%d" %couti) + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 100, + "batchNum": 5000 + } + + dnodeNumbers = int(dnodeNumbers) + mnodeNums = int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers = (paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb = paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall = rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + replica3 = 3 + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 3") + TdSqlEx.execute('alter database db0_0 replica 3') + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + # tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);') + # TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);') + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + for tr in threads: + tr.join() + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + # for i in range(paraDict['stbNumbers']): + # stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + # tdSql.query("select count(*) from %s"%stableName) + # tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=240) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py new file mode 100644 index 0000000000000000000000000000000000000000..ed7b99a880716b85ded5238b80eb11691166bc1e --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py @@ -0,0 +1,196 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,countstart,countstop): + # fisrt add data : db\stable\childtable\general table + + for couti in range(countstart,countstop): + tdLog.debug("drop database if exists db%d" %couti) + tdSql.execute("drop database if exists db%d" %couti) + print("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) + tdSql.execute("use db%d" %couti) + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 3, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 1, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + replica1 = 1 + replica3 = 3 + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + stableName= paraDict['stbName'] + newTdSql=tdCom.newTdSql() + clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers']) + #create child table:ctb_0 + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) + #insert date + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql=tdCom.newTdSql() + threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) + for tr in threads: + tr.start() + TdSqlEx=tdCom.newTdSql() + tdLog.info(f"alter database db0_0 replica {replica1}") + TdSqlEx.execute(f'alter database db0_0 replica {replica1}') + for tr in threads: + tr.join() + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + tdSql.checkData(0,0,rowsPerStb) + + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica1,db_name=paraDict["dbName"],count_number=20) + sleep(5) + tdLog.info(f"show transactions;alter database db0_0 replica {replica3};") + TdSqlEx.execute(f'show transactions;') + TdSqlEx.execute(f'alter database db0_0 replica {replica3};') + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=120) + + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py new file mode 100644 index 0000000000000000000000000000000000000000..e02af29a050c3812ef3b27a6b2c9aa126cb2146c --- /dev/null +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py @@ -0,0 +1,191 @@ +import taos +import sys +import time +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import time +import socket +import subprocess +from multiprocessing import Process +import threading +import time +import inspect +import ctypes + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + tdSql.init(conn.cursor()) + self.host = socket.gethostname() + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def _async_raise(self, tid, exctype): + """raises the exception, performs cleanup if needed""" + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def stopThread(self,thread): + self._async_raise(thread.ident, SystemExit) + + + def insertData(self,dbname,tableCount,rowsPerCount): + # tableCount : create table number + # rowsPerCount : rows per table + # fisrt add data : db\stable\childtable\general table + os.system(f"taosBenchmark -d {dbname} -n {tableCount} -t {rowsPerCount} -z 1 -k 10000 -y ") + + + def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db0_0', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 2, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 10000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + "rowsPerTbl": 10000, + "batchNum": 5000 + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"]) + rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] + rowsall=rowsPerStb*paraDict['stbNumbers'] + dbNumbers = 1 + + tdLog.info("first check dnode and mnode") + tdSql.query("select * from information_schema.ins_dnodes;") + tdSql.checkData(0,1,'%s:6030'%self.host) + tdSql.checkData(4,1,'%s:6430'%self.host) + clusterComCheck.checkDnodes(dnodeNumbers) + + #check mnode status + tdLog.info("check mnode status") + clusterComCheck.checkMnodeStatus(mnodeNums) + + # add some error operations and + tdLog.info("Confirm the status of the dnode again") + tdSql.error("create mnode on dnode 2") + tdSql.query("select * from information_schema.ins_dnodes;") + print(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + # create database and stable + tdLog.info("Take turns stopping Mnodes ") + + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + + # create stable:stb_0 + threads.append(threading.Thread(target=self.insertData, args=(paraDict["dbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"]))) + for tr in threads: + tr.start() + TdSqlEx=tdCom.newTdSql() + tdLog.info("alter database db0_0 replica 3") + TdSqlEx.execute('alter database db0_0 replica 3') + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + # tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;') + # tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);') + # TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);') + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("123") + else: + print("456") + + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + for tr in threads: + tr.join() + clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDbRows(dbNumbers) + # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) + + # tdSql.execute("use %s" %(paraDict["dbName"])) + tdSql.query("show %s.stables"%(paraDict["dbName"])) + tdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i) + tdSql.query("select count(*) from %s"%stableName) + tdSql.checkData(0,0,rowsPerStb) + clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=240) + def run(self): + # print(self.master_dnode.cfgDict) + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index 5b5326cfba2a84bf4ca1e120fc1d611a57f5274b..b66334a6a66ce3c7818715e74891cb680ac0b2b3 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -336,7 +336,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows >= expectrowcnt or totalConsumeRows <= 0: + if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0: tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/subscribeStb.py b/tests/system-test/7-tmq/subscribeStb.py index 9dcbf5b351ff80023fc0eea9476d3d436364ff03..53f1a34d58d36ea9b4ac28fee1bfef2ab9d1d7c0 100644 --- a/tests/system-test/7-tmq/subscribeStb.py +++ b/tests/system-test/7-tmq/subscribeStb.py @@ -226,12 +226,11 @@ class TDTestCase: self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") - pollDelay = 5 + pollDelay = 10 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - time.sleep(5) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) self.insert_data(tdSql,\ parameterDict["dbName"],\ @@ -307,7 +306,7 @@ class TDTestCase: self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") - pollDelay = 5 + pollDelay = 10 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)