diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 526f38047557af01f4a04f8cf6f3af4657f2a675..8e554d62ffa1da2cdcefb9eaabf8ea400aef283f 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -256,11 +256,12 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { } size_t numOfTables = taosArrayGetSize(tables); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress)); for( size_t i = 0; i < numOfTables; i++ ) { STidTags* tt = taosArrayGet( tables, i ); SSubscriptionProgress p = { .uid = tt->uid }; - p.key = tscGetSubscriptionProgress(pSub, tt->uid, INT64_MIN); + p.key = tscGetSubscriptionProgress(pSub, tt->uid, pQueryInfo->window.skey); taosArrayPush(progress, &p); } taosArraySort(progress, tscCompareSubscriptionProgress); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 4790c22668d669d300994b59bef748d9bb80fbd7..95840af1a267dbbe320319c301e0b5566556de50 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -660,7 +660,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { pConn->spi = pRpc->spi; pConn->encrypt = pRpc->encrypt; if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); - tDebug("%s %p client connection is allocated", pRpc->label, pConn); + tDebug("%s %p client connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid); } return pConn; @@ -721,7 +721,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tDebug("%s %p server connection is allocated", pRpc->label, pConn); + tDebug("%s %p server connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid); } return pConn; @@ -848,6 +848,16 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_RPC_ALREADY_PROCESSED; } + if (pHead->code == TSDB_CODE_RPC_MISMATCHED_LINK_ID) { + tDebug("%s, mismatched linkUid, link shall be restarted", pConn->info); + pConn->secured = 0; + ((SRpcHead *)pConn->pReqMsg)->destId = 0; + rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); + if (pConn->connType != RPC_CONN_TCPC) + pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); + return TSDB_CODE_RPC_ALREADY_PROCESSED; + } + if (pHead->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { tDebug("%s, peer is still processing the transaction, retry:%d", pConn->info, pConn->tretry); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 27f95f31de4750372043951626cd94700f847ca7..9ceaa08625a48f2bc8107de58af8d680612ea5a0 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -147,6 +147,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) goto _err; pFile->info.size = TSDB_FILE_HEAD_SIZE; pFile->info.magic = TSDB_FILE_INIT_MAGIC; + pFile->info.len = 0; if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1; } } else { @@ -302,6 +303,10 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); } + if (helperType(pHelper) == TSDB_WRITE_HELPER && pHelper->curCompIdx.hasLast) { + pHelper->hasOldLastBlock = true; + } + helperSetState(pHelper, TSDB_HELPER_TABLE_SET); ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1)); } @@ -555,10 +560,6 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { } helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); - if (helperType(pHelper) == TSDB_WRITE_HELPER) { - pFile->info.len = 0; - } - // Copy the memory for outside usage if (target && pHelper->idxH.numOfIdx > 0) memcpy(target, pHelper->idxH.pIdxArray, sizeof(SCompIdx) * pHelper->idxH.numOfIdx); @@ -1259,13 +1260,18 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompCol *pCompCol = NULL; while (true) { - ASSERT(dcol < pDataCols->numOfCols); + // ASSERT(dcol < pDataCols->numOfCols); + if (dcol >= pDataCols->numOfCols) { + pDataCol = NULL; + break; + } pDataCol = &pDataCols->cols[dcol]; ASSERT(pDataCol->colId <= colId); if (pDataCol->colId == colId) break; dcol++; } + if (pDataCol == NULL) continue; ASSERT(pDataCol->colId == colId); if (colId == 0) { // load the key row @@ -1517,8 +1523,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, if (rows2 == 0) { // all data filtered out *(pCommitIter->pIter) = slIter; } else { - if (rows1 + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && - !TSDB_NLAST_FILE_OPENED(pHelper)) { + if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock && + pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { tdResetDataCols(pDataCols); int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows); diff --git a/tests/pytest/subscribe/__init__.py b/tests/pytest/subscribe/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/pytest/subscribe/singlemeter.py b/tests/pytest/subscribe/singlemeter.py new file mode 100644 index 0000000000000000000000000000000000000000..879e0a75ebdf29022990b5e2e250370620c74636 --- /dev/null +++ b/tests/pytest/subscribe/singlemeter.py @@ -0,0 +1,79 @@ +################################################################### + # Copyright (c) 2020 by TAOS Technologies, Inc. + # All rights reserved. + # + # This file is proprietary and confidential to TAOS Technologies. + # No part of this file may be reproduced, stored, transmitted, + # disclosed or used in any form or by any means other than as + # expressly provided by the written permission from Jianhui Tao + # +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +import time +from util.log import * +from util.cases import * +from util.sql import * +from util.sub import * + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.conn = conn + + def run(self): + sqlstr = "select * from t0" + topic = "test" + now = int(time.time() * 1000) + tdSql.prepare() + + tdLog.info("create a table and insert 10 rows.") + tdSql.execute("create table t0(ts timestamp, a int, b int);") + for i in range(0, 10): + tdSql.execute("insert into t0 values (%d, %d, %d);" % (now + i, i, i)) + + tdLog.info("consumption 01.") + tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0)) + tdSub.consume() + tdSub.checkRows(10) + + tdLog.info("consumption 02: no new rows inserted") + tdSub.consume() + tdSub.checkRows(0) + + tdLog.info("consumption 03: after one new rows inserted") + tdSql.execute("insert into t0 values (%d, 10, 10);" % (now + 10)) + tdSub.consume() + tdSub.checkRows(1) + + tdLog.info("consumption 04: keep progress and continue previous subscription") + tdSub.close(True) + tdSub.init(self.conn.subscribe(False, topic, sqlstr, 0)) + tdSub.consume() + tdSub.checkRows(0) + + tdLog.info("consumption 05: remove progress and continue previous subscription") + tdSub.close(False) + tdSub.init(self.conn.subscribe(False, topic, sqlstr, 0)) + tdSub.consume() + tdSub.checkRows(11) + + tdLog.info("consumption 06: keep progress and restart the subscription") + tdSub.close(True) + tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0)) + tdSub.consume() + tdSub.checkRows(11) + + tdSub.close(True) + + def stop(self): + tdSub.close(False) + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/subscribe/supertable.py b/tests/pytest/subscribe/supertable.py new file mode 100644 index 0000000000000000000000000000000000000000..c6cc2969aa57848bf2c43163a26608117e82ad90 --- /dev/null +++ b/tests/pytest/subscribe/supertable.py @@ -0,0 +1,114 @@ +################################################################### + # Copyright (c) 2020 by TAOS Technologies, Inc. + # All rights reserved. + # + # This file is proprietary and confidential to TAOS Technologies. + # No part of this file may be reproduced, stored, transmitted, + # disclosed or used in any form or by any means other than as + # expressly provided by the written permission from Jianhui Tao + # +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +import time +from util.log import * +from util.cases import * +from util.sql import * +from util.sub import * + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.conn = conn + + def run(self): + sqlstr = "select * from meters" + topic = "test" + now = int(time.time() * 1000) + tdSql.prepare() + + tdLog.info("create a super table and 10 sub-tables, then insert 5 rows into each sub-table.") + tdSql.execute("create table meters(ts timestamp, a int, b int) tags(area int, loc binary(20));") + for i in range(0, 10): + for j in range(0, 5): + tdSql.execute("insert into t%d using meters tags(%d, 'area%d') values (%d, %d, %d);" % (i, i, i, now + j, j, j)) + + tdLog.info("consumption 01.") + tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0)) + tdSub.consume() + tdSub.checkRows(50) + + tdLog.info("consumption 02: no new rows inserted") + tdSub.consume() + tdSub.checkRows(0) + + tdLog.info("consumption 03: after one new rows inserted") + tdSql.execute("insert into t0 values (%d, 10, 10);" % (now + 10)) + tdSub.consume() + tdSub.checkRows(1) + + tdLog.info("consumption 04: keep progress and continue previous subscription") + tdSub.close(True) + tdSub.init(self.conn.subscribe(False, topic, sqlstr, 0)) + tdSub.consume() + tdSub.checkRows(0) + + tdLog.info("consumption 05: remove progress and continue previous subscription") + tdSub.close(False) + tdSub.init(self.conn.subscribe(False, topic, sqlstr, 0)) + tdSub.consume() + tdSub.checkRows(51) + + tdLog.info("consumption 06: keep progress and restart the subscription") + tdSub.close(True) + tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0)) + tdSub.consume() + tdSub.checkRows(51) + + tdLog.info("consumption 07: insert one row to two table then remove one table") + tdSql.execute("insert into t0 values (%d, 11, 11);" % (now + 11)) + tdSql.execute("insert into t1 values (%d, 11, 11);" % (now + 11)) + tdSql.execute("drop table t0") + tdSub.consume() + tdSub.checkRows(1) + + tdLog.info("consumption 08: check timestamp criteria") + tdSub.close(False) + tdSub.init(self.conn.subscribe(True, topic, sqlstr + " where ts > %d" % now, 0)) + tdSub.consume() + tdSub.checkRows(37) + + tdLog.info("consumption 09: insert large timestamp to t2 then insert smaller timestamp to t1") + tdSql.execute("insert into t2 values (%d, 100, 100);" % (now + 100)) + tdSub.consume() + tdSub.checkRows(1) + tdSql.execute("insert into t1 values (%d, 12, 12);" % (now + 12)) + tdSub.consume() + tdSub.checkRows(1) + + tdLog.info("consumption 10: field criteria") + tdSub.close(True) + tdSub.init(self.conn.subscribe(False, topic, sqlstr + " where a > 100", 0)) + tdSql.execute("insert into t2 values (%d, 101, 100);" % (now + 101)) + tdSql.execute("insert into t2 values (%d, 100, 100);" % (now + 102)) + tdSql.execute("insert into t2 values (%d, 102, 100);" % (now + 103)) + tdSub.consume() + tdSub.checkRows(2) + + tdLog.info("consumption 11: two vnodes") + tdSql.execute("insert into t2 values (%d, 102, 100);" % (now + 104)) + tdSql.execute("insert into t9 values (%d, 102, 100);" % (now + 104)) + tdSub.consume() + tdSub.checkRows(2) + + def stop(self): + tdSub.close(False) + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/util/dnodes-default.py b/tests/pytest/util/dnodes-default.py new file mode 100644 index 0000000000000000000000000000000000000000..ec3865f4f2fbd28f84b671b12d181a154f08424b --- /dev/null +++ b/tests/pytest/util/dnodes-default.py @@ -0,0 +1,502 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import os.path +import subprocess +from util.log import * + + +class TDSimClient: + def __init__(self): + self.testCluster = False + + self.cfgDict = { + "numOfLogLines": "100000000", + "numOfThreadsPerCore": "2.0", + "locale": "en_US.UTF-8", + "charset": "UTF-8", + "asyncLog": "0", + "minTablesPerVnode": "4", + "maxTablesPerVnode": "1000", + "tableIncStepPerVnode": "10000", + "maxVgroupsPerDb": "1000", + "sdbDebugFlag": "143", + "rpcDebugFlag": "135", + "tmrDebugFlag": "131", + "cDebugFlag": "135", + "udebugFlag": "135", + "jnidebugFlag": "135", + "qdebugFlag": "135", + } + def init(self, path): + self.__init__() + self.path = path + + def getLogDir(self): + self.logDir = "%s/sim/psim/log" % (self.path) + return self.logDir + + def getCfgDir(self): + self.cfgDir = "%s/sim/psim/cfg" % (self.path) + return self.cfgDir + + def setTestCluster(self, value): + self.testCluster = value + + def addExtraCfg(self, option, value): + self.cfgDict.update({option: value}) + + def cfg(self, option, value): + cmd = "echo '%s %s' >> %s" % (option, value, self.cfgPath) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def deploy(self): + self.logDir = "%s/sim/psim/log" % (self.path) + self.cfgDir = "%s/sim/psim/cfg" % (self.path) + self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path) + + cmd = "rm -rf " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "rm -rf " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "touch " + self.cfgPath + if os.system(cmd) != 0: + tdLog.exit(cmd) + + if self.testCluster: + self.cfg("masterIp", "192.168.0.1") + self.cfg("secondIp", "192.168.0.2") + self.cfg("logDir", self.logDir) + + for key, value in self.cfgDict.items(): + self.cfg(key, value) + + tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath)) + + +class TDDnode: + def __init__(self, index): + self.index = index + self.running = 0 + self.deployed = 0 + self.testCluster = False + self.valgrind = 0 + + def init(self, path): + self.path = path + + def setTestCluster(self, value): + self.testCluster = value + + def setValgrind(self, value): + self.valgrind = value + + def getDataSize(self): + totalSize = 0 + + if (self.deployed == 1): + for dirpath, dirnames, filenames in os.walk(self.dataDir): + for f in filenames: + fp = os.path.join(dirpath, f) + + if not os.path.islink(fp): + totalSize = totalSize + os.path.getsize(fp) + + return totalSize + + def deploy(self): + self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index) + self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index) + self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index) + self.cfgPath = "%s/sim/dnode%d/cfg/taos.cfg" % ( + self.path, self.index) + + cmd = "rm -rf " + self.dataDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "rm -rf " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "rm -rf " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.dataDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.logDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "mkdir -p " + self.cfgDir + if os.system(cmd) != 0: + tdLog.exit(cmd) + + cmd = "touch " + self.cfgPath + if os.system(cmd) != 0: + tdLog.exit(cmd) + + if self.testCluster: + self.startIP() + + if self.testCluster: + self.cfg("masterIp", "192.168.0.1") + self.cfg("secondIp", "192.168.0.2") + self.cfg("publicIp", "192.168.0.%d" % (self.index)) + self.cfg("internalIp", "192.168.0.%d" % (self.index)) + self.cfg("privateIp", "192.168.0.%d" % (self.index)) + self.cfg("dataDir", self.dataDir) + self.cfg("logDir", self.logDir) + self.cfg("numOfLogLines", "100000000") + self.cfg("mnodeEqualVnodeNum", "0") + self.cfg("walLevel", "2") + self.cfg("fsync", "1000") + self.cfg("statusInterval", "1") + self.cfg("numOfMnodes", "3") + self.cfg("numOfThreadsPerCore", "2.0") + self.cfg("monitor", "0") + self.cfg("maxVnodeConnections", "30000") + self.cfg("maxMgmtConnections", "30000") + self.cfg("maxMeterConnections", "30000") + self.cfg("maxShellConns", "30000") + self.cfg("locale", "en_US.UTF-8") + self.cfg("charset", "UTF-8") + self.cfg("asyncLog", "0") + self.cfg("anyIp", "0") + self.cfg("dDebugFlag", "135") + self.cfg("mDebugFlag", "135") + self.cfg("sdbDebugFlag", "135") + self.cfg("rpcDebugFlag", "135") + self.cfg("tmrDebugFlag", "131") + self.cfg("cDebugFlag", "135") + self.cfg("httpDebugFlag", "135") + self.cfg("monitorDebugFlag", "135") + self.cfg("udebugFlag", "135") + self.cfg("jnidebugFlag", "135") + self.cfg("qdebugFlag", "135") + self.deployed = 1 + tdLog.debug( + "dnode:%d is deployed and configured by %s" % + (self.index, self.cfgPath)) + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + def start(self): + buildPath = self.getBuildPath() + + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + + binPath = buildPath + "/build/bin/taosd" + + if self.deployed == 0: + tdLog.exit("dnode:%d is not deployed" % (self.index)) + + if self.valgrind == 0: + cmd = "nohup %s -c %s > /dev/null 2>&1 & " % ( + binPath, self.cfgDir) + else: + valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes" + + cmd = "nohup %s %s -c %s 2>&1 & " % ( + valgrindCmdline, binPath, self.cfgDir) + + print(cmd) + + if os.system(cmd) != 0: + tdLog.exit(cmd) + self.running = 1 + tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) + + tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) + time.sleep(5) + + def stop(self): + if self.valgrind == 0: + toBeKilled = "taosd" + else: + toBeKilled = "valgrind.bin" + + if self.running != 0: + psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + while(processID): + killCmd = "kill -INT %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + for port in range(6030, 6041): + fuserCmd = "fuser -k -n tcp %d" % port + os.system(fuserCmd) + if self.valgrind: + time.sleep(2) + + self.running = 0 + tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) + + def forcestop(self): + if self.valgrind == 0: + toBeKilled = "taosd" + else: + toBeKilled = "valgrind.bin" + + if self.running != 0: + psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + while(processID): + killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + for port in range(6030, 6041): + fuserCmd = "fuser -k -n tcp %d" % port + os.system(fuserCmd) + if self.valgrind: + time.sleep(2) + + self.running = 0 + tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index)) + + def startIP(self): + cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def stopIP(self): + cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % ( + self.index, self.index) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def cfg(self, option, value): + cmd = "echo '%s %s' >> %s" % (option, value, self.cfgPath) + if os.system(cmd) != 0: + tdLog.exit(cmd) + + def getDnodeRootDir(self, index): + dnodeRootDir = "%s/sim/psim/dnode%d" % (self.path, index) + return dnodeRootDir + + def getDnodesRootDir(self): + dnodesRootDir = "%s/sim/psim" % (self.path) + return dnodesRootDir + + +class TDDnodes: + def __init__(self): + self.dnodes = [] + self.dnodes.append(TDDnode(1)) + self.dnodes.append(TDDnode(2)) + self.dnodes.append(TDDnode(3)) + self.dnodes.append(TDDnode(4)) + self.dnodes.append(TDDnode(5)) + self.dnodes.append(TDDnode(6)) + self.dnodes.append(TDDnode(7)) + self.dnodes.append(TDDnode(8)) + self.dnodes.append(TDDnode(9)) + self.dnodes.append(TDDnode(10)) + self.simDeployed = False + + def init(self, path): + psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + while(processID): + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + while(processID): + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + binPath = os.path.dirname(os.path.realpath(__file__)) + binPath = binPath + "/../../../debug/" + tdLog.debug("binPath %s" % (binPath)) + binPath = os.path.realpath(binPath) + tdLog.debug("binPath real path %s" % (binPath)) + + # cmd = "sudo cp %s/build/lib/libtaos.so /usr/local/lib/taos/" % (binPath) + # tdLog.debug(cmd) + # os.system(cmd) + + # cmd = "sudo cp %s/build/bin/taos /usr/local/bin/taos/" % (binPath) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + # tdLog.debug("execute %s" % (cmd)) + + # cmd = "sudo cp %s/build/bin/taosd /usr/local/bin/taos/" % (binPath) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + # tdLog.debug("execute %s" % (cmd)) + + if path == "": + # self.path = os.path.expanduser('~') + self.path = os.path.abspath(binPath + "../../") + else: + self.path = os.path.realpath(path) + + for i in range(len(self.dnodes)): + self.dnodes[i].init(self.path) + + self.sim = TDSimClient() + self.sim.init(self.path) + + def setTestCluster(self, value): + self.testCluster = value + + def setValgrind(self, value): + self.valgrind = value + + def deploy(self, index): + self.sim.setTestCluster(self.testCluster) + + if (self.simDeployed == False): + self.sim.deploy() + self.simDeployed = True + + self.check(index) + self.dnodes[index - 1].setTestCluster(self.testCluster) + self.dnodes[index - 1].setValgrind(self.valgrind) + self.dnodes[index - 1].deploy() + + def cfg(self, index, option, value): + self.check(index) + self.dnodes[index - 1].cfg(option, value) + + def start(self, index): + self.check(index) + self.dnodes[index - 1].start() + + def stop(self, index): + self.check(index) + self.dnodes[index - 1].stop() + + def getDataSize(self, index): + self.check(index) + return self.dnodes[index - 1].getDataSize() + + def forcestop(self, index): + self.check(index) + self.dnodes[index - 1].forcestop() + + def startIP(self, index): + self.check(index) + + if self.testCluster: + self.dnodes[index - 1].startIP() + + def stopIP(self, index): + self.check(index) + + if self.dnodes[index - 1].testCluster: + self.dnodes[index - 1].stopIP() + + def check(self, index): + if index < 1 or index > 10: + tdLog.exit("index:%d should on a scale of [1, 10]" % (index)) + + def stopAll(self): + tdLog.info("stop all dnodes") + for i in range(len(self.dnodes)): + self.dnodes[i].stop() + + psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + if processID: + cmd = "sudo systemctl stop taosd" + os.system(cmd) + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + while(processID): + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") + while(processID): + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8") + + # if os.system(cmd) != 0 : + # tdLog.exit(cmd) + + def getDnodesRootDir(self): + dnodesRootDir = "%s/sim" % (self.path) + return dnodesRootDir + + def getSimCfgPath(self): + return self.sim.getCfgDir() + + def getSimLogPath(self): + return self.sim.getLogDir() + + def addSimExtraCfg(self, option, value): + self.sim.addExtraCfg(option, value) + + +tdDnodes = TDDnodes() diff --git a/tests/pytest/util/dnodes-no-random-fail.py b/tests/pytest/util/dnodes-no-random-fail.py index 150b8f0c812ec5c22ff496d9fe455b5aab9bf286..485c1b4b3ff96ad63e78cd32b2e91470df0c8262 100644 --- a/tests/pytest/util/dnodes-no-random-fail.py +++ b/tests/pytest/util/dnodes-no-random-fail.py @@ -349,7 +349,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -358,7 +358,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -465,7 +465,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -474,7 +474,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( diff --git a/tests/pytest/util/dnodes-random-fail.py b/tests/pytest/util/dnodes-random-fail.py index 36c0f9d796ef209d7774581308325db503ccdd19..7c4edde47f7e398be99c4221fe9e7104f8e4437a 100644 --- a/tests/pytest/util/dnodes-random-fail.py +++ b/tests/pytest/util/dnodes-random-fail.py @@ -349,7 +349,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -358,7 +358,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -465,7 +465,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -474,7 +474,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 582bd0abae331ac73e43fdca33e02f764a4d6c59..ec3865f4f2fbd28f84b671b12d181a154f08424b 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -351,7 +351,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -360,7 +360,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -467,7 +467,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( @@ -476,7 +476,7 @@ class TDDnodes: psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): - killCmd = "kill -KILL %s > /dev/null 2>&1" % processID + killCmd = "kill -TERM %s > /dev/null 2>&1" % processID os.system(killCmd) time.sleep(1) processID = subprocess.check_output( diff --git a/tests/pytest/util/sub.py b/tests/pytest/util/sub.py new file mode 100644 index 0000000000000000000000000000000000000000..2e3c2a96b7312c176c25bd35e109e146e5c4593f --- /dev/null +++ b/tests/pytest/util/sub.py @@ -0,0 +1,43 @@ +################################################################### + # Copyright (c) 2020 by TAOS Technologies, Inc. + # All rights reserved. + # + # This file is proprietary and confidential to TAOS Technologies. + # No part of this file may be reproduced, stored, transmitted, + # disclosed or used in any form or by any means other than as + # expressly provided by the written permission from Jianhui Tao + # +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import time +import datetime +from util.log import * + +class TDSub: + def __init__(self): + self.consumedRows = 0 + self.consumedCols = 0 + + def init(self, sub): + self.sub = sub + + def close(self, keepProgress): + self.sub.close(keepProgress) + + def consume(self): + self.data = self.sub.consume() + self.consumedRows = len(self.data) + self.consumedCols = len(self.sub.fields) + return self.consumedRows + + def checkRows(self, expectRows): + if self.consumedRows != expectRows: + tdLog.exit("consumed rows:%d != expect:%d" % (self.consumedRows, expectRows)) + tdLog.info("consumed rows:%d == expect:%d" % (self.consumedRows, expectRows)) + + +tdSub = TDSub() diff --git a/tests/script/sh/exec-default.sh b/tests/script/sh/exec-default.sh new file mode 100755 index 0000000000000000000000000000000000000000..1c84a6b10ed3f958e096d32eda727441ff2591da --- /dev/null +++ b/tests/script/sh/exec-default.sh @@ -0,0 +1,115 @@ +#!/bin/bash + +# if [ $# != 4 || $# != 5 ]; then + # echo "argument list need input : " + # echo " -n nodeName" + # echo " -s start/stop" + # echo " -c clear" + # exit 1 +# fi + +NODE_NAME= +EXEC_OPTON= +CLEAR_OPTION="false" +while getopts "n:s:u:x:ct" arg +do + case $arg in + n) + NODE_NAME=$OPTARG + ;; + s) + EXEC_OPTON=$OPTARG + ;; + c) + CLEAR_OPTION="clear" + ;; + t) + SHELL_OPTION="true" + ;; + u) + USERS=$OPTARG + ;; + x) + SIGNAL=$OPTARG + ;; + ?) + echo "unkown argument" + ;; + esac +done + +SCRIPT_DIR=`dirname $0` +cd $SCRIPT_DIR/../ +SCRIPT_DIR=`pwd` + +IN_TDINTERNAL="community" +if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then + cd ../../.. +else + cd ../../ +fi + +TAOS_DIR=`pwd` +TAOSD_DIR=`find . -name "taosd"|grep bin|head -n1` + +if [[ "$TAOSD_DIR" == *"$IN_TDINTERNAL"* ]]; then + BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3` +else + BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2` +fi + +BUILD_DIR=$TAOS_DIR/$BIN_DIR/build + +SIM_DIR=$TAOS_DIR/sim +NODE_DIR=$SIM_DIR/$NODE_NAME +EXE_DIR=$BUILD_DIR/bin +CFG_DIR=$NODE_DIR/cfg +LOG_DIR=$NODE_DIR/log +DATA_DIR=$NODE_DIR/data +MGMT_DIR=$NODE_DIR/data/mgmt +TSDB_DIR=$NODE_DIR/data/tsdb + +TAOS_CFG=$NODE_DIR/cfg/taos.cfg + +echo ------------ $EXEC_OPTON $NODE_NAME + +TAOS_FLAG=$SIM_DIR/tsim/flag +if [ -f "$TAOS_FLAG" ]; then + EXE_DIR=/usr/local/bin/taos +fi + +if [ "$CLEAR_OPTION" = "clear" ]; then + echo rm -rf $MGMT_DIR $TSDB_DIR + rm -rf $TSDB_DIR + rm -rf $MGMT_DIR +fi + +if [ "$EXEC_OPTON" = "start" ]; then + echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR + + if [ "$SHELL_OPTION" = "true" ]; then + TT=`date +%s` + mkdir ${LOG_DIR}/${TT} + nohup valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & + else + nohup $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & + fi + +else + #relative path + RCFG_DIR=sim/$NODE_NAME/cfg + PID=`ps -ef|grep taosd | grep $RCFG_DIR | grep -v grep | awk '{print $2}'` + while [ -n "$PID" ] + do + if [ "$SIGNAL" = "SIGKILL" ]; then + echo try to kill by signal SIGKILL + kill -9 $PID + else + echo try to kill by signal SIGINT + kill -SIGINT $PID + fi + sleep 1 + PID=`ps -ef|grep taosd | grep $RCFG_DIR | grep -v grep | awk '{print $2}'` + done +fi +