diff --git a/packaging/tools/install_client.sh b/packaging/tools/install_client.sh index 4cf95454e022da6f8d3e497d335175d86da486c5..5f449e5d91122522d595eb2ccfb948aa4f8a66fe 100755 --- a/packaging/tools/install_client.sh +++ b/packaging/tools/install_client.sh @@ -17,6 +17,7 @@ serverName="taosd" clientName="taos" uninstallScript="rmtaos" configFile="taos.cfg" +tarName="taos.tar.gz" osType=Linux pagMode=full @@ -242,6 +243,11 @@ function install_examples() { function update_TDengine() { # Start to update + if [ ! -e ${tarName} ]; then + echo "File ${tarName} does not exist" + exit 1 + fi + tar -zxf ${tarName} echo -e "${GREEN}Start to update ${productName} client...${NC}" # Stop the client shell if running if pidof ${clientName} &> /dev/null; then @@ -264,42 +270,49 @@ function update_TDengine() { echo echo -e "\033[44;32;1m${productName} client is updated successfully!${NC}" + + rm -rf $(tar -tf ${tarName}) } function install_TDengine() { - # Start to install - echo -e "${GREEN}Start to install ${productName} client...${NC}" - - install_main_path - install_log - install_header - install_lib - install_jemalloc - if [ "$verMode" == "cluster" ]; then - install_connector - fi - install_examples - install_bin - install_config + # Start to install + if [ ! -e ${tarName} ]; then + echo "File ${tarName} does not exist" + exit 1 + fi + tar -zxf ${tarName} + echo -e "${GREEN}Start to install ${productName} client...${NC}" - echo - echo -e "\033[44;32;1m${productName} client is installed successfully!${NC}" + install_main_path + install_log + install_header + install_lib + install_jemalloc + if [ "$verMode" == "cluster" ]; then + install_connector + fi + install_examples + install_bin + install_config + + echo + echo -e "\033[44;32;1m${productName} client is installed successfully!${NC}" - rm -rf $(tar -tf ${tarName}) + rm -rf $(tar -tf ${tarName}) } ## ==============================Main program starts from here============================ # Install or updata client and client # if server is already install, don't install client - if [ -e ${bin_dir}/${serverName} ]; then - echo -e "\033[44;32;1mThere are already installed ${productName} server, so don't need install client!${NC}" - exit 0 - fi +if [ -e ${bin_dir}/${serverName} ]; then + echo -e "\033[44;32;1mThere are already installed ${productName} server, so don't need install client!${NC}" + exit 0 +fi - if [ -x ${bin_dir}/${clientName} ]; then - update_flag=1 - update_TDengine - else - install_TDengine - fi +if [ -x ${bin_dir}/${clientName} ]; then + update_flag=1 + update_TDengine +else + install_TDengine +fi diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index f28209f9828062f8ed27f194914b4ac11848735a..10e6ad4e1e3d5f942d0850d59b9a2d80a3d4b62b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -138,7 +138,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->dbId = pCreate->dbUid; pCfg->szPage = pCreate->pageSize * 1024; pCfg->szCache = pCreate->pages; - pCfg->szBuf = pCreate->buffer * 1024 * 1024; + pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024; pCfg->isWeak = true; pCfg->tsdbCfg.compression = pCreate->compression; pCfg->tsdbCfg.precision = pCreate->precision; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 1bb003bab9f526da988945c1025d09aebcffd39d..7cebeb35f5bb9e3f2b363c438a1ce70ad3296717 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -419,6 +419,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqTopicObj topicObj = {0}; memcpy(&topicObj, pTopic, sizeof(SMqTopicObj)); topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1; + mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup, + topicObj.refConsumerCnt); if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER; mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 3713bd501a42e5e450a0cb2dc80e0ebe14c32417..0ece5d29e525a649e8a02b0890eb7ae951008f7b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -417,7 +417,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 2. redo log: subscribe and vg assignment // subscribe - if (mndSetSubRedoLogs(pMnode, pTrans, pOutput->pSub) != 0) { + if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { goto REB_FAIL; } @@ -479,6 +479,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu SMqTopicObj topicObj = {0}; memcpy(&topicObj, pTopic, sizeof(SMqTopicObj)); topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum; + // TODO is that correct? + pTopic->refConsumerCnt = topicObj.refConsumerCnt; + mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup, + topicObj.refConsumerCnt); if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL; } } diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index b536a70515a38eec91a4a007a2f4850c0056e89e..66c79fd29222714e24f4a83ac417502758c02eff 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -93,7 +93,7 @@ class TDTestCase: tdLog.info(shellCmd) os.system(shellCmd) - def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum): tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) @@ -147,8 +147,7 @@ class TDTestCase: parameterDict["dbName"],\ parameterDict["vgroups"],\ parameterDict["stbName"],\ - parameterDict["ctbNum"],\ - parameterDict["rowsPerTbl"]) + parameterDict["ctbNum"]) self.insert_data(tsql,\ parameterDict["dbName"],\ @@ -322,6 +321,75 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 2 end ...... ") + def tmqCase2a(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 2a: Produce while two consumers to subscribe one db, inclue 1 stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db2a', \ + 'vgroups': 4, \ + 'stbName': 'stb1', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.initConsumerTable() + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + tdSql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict['dbName'], parameterDict['stbName'])) + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + consumerId = 1 + keyList = 'group.id:cgrp2,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + pollDelay = 10 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + expectRows = 2 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt * 2: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 2a end ...... ") + def tmqCase3(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb") tdLog.info("step 1: create database, stb, ctb and insert data") @@ -745,6 +813,7 @@ class TDTestCase: self.tmqCase1(cfgPath, buildPath) self.tmqCase2(cfgPath, buildPath) + self.tmqCase2a(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) self.tmqCase4(cfgPath, buildPath) self.tmqCase5(cfgPath, buildPath)