diff --git a/.drone.yml b/.drone.yml index eed8b6306d20b0f314f41bd33bb9538a2035f03e..f7ee4e976f7b6a077ea0b9a8cb7f9cadb5c3628f 100644 --- a/.drone.yml +++ b/.drone.yml @@ -7,41 +7,22 @@ platform: arch: amd64 steps: -- name: smoke_test - image: python:3.8 +- name: build + image: gcc commands: - apt-get update - - apt-get install -y cmake build-essential gcc - - pip3 install psutil - - pip3 install guppy3 - - pip3 install src/connector/python/ + - apt-get install -y cmake build-essential - mkdir debug - cd debug - cmake .. - make - - cd ../tests - - ./test-all.sh smoke + trigger: + event: + - pull_request when: branch: - develop - master - - -- name: crash_gen - image: python:3.8 - commands: - - pip3 install requests - - pip3 install src/connector/python/ - - pip3 install psutil - - pip3 install guppy3 - - cd tests/pytest - - ./crash_gen.sh -a -p -t 4 -s 2000 - when: - branch: - - develop - - master - - --- kind: pipeline name: test_arm64 @@ -60,6 +41,9 @@ steps: - cd debug - cmake .. -DCPUTYPE=aarch64 > /dev/null - make + trigger: + event: + - pull_request when: branch: - develop @@ -82,6 +66,9 @@ steps: - cd debug - cmake .. -DCPUTYPE=aarch32 > /dev/null - make + trigger: + event: + - pull_request when: branch: - develop @@ -106,11 +93,13 @@ steps: - cd debug - cmake .. - make + trigger: + event: + - pull_request when: branch: - develop - master - --- kind: pipeline name: build_xenial @@ -129,6 +118,9 @@ steps: - cd debug - cmake .. - make + trigger: + event: + - pull_request when: branch: - develop @@ -151,6 +143,32 @@ steps: - cd debug - cmake .. - make + trigger: + event: + - pull_request + when: + branch: + - develop + - master +--- +kind: pipeline +name: build_centos7 +platform: + os: linux + arch: amd64 + +steps: +- name: build + image: ansible/centos7-ansible + commands: + - yum install -y gcc gcc-c++ make cmake + - mkdir debug + - cd debug + - cmake .. + - make + trigger: + event: + - pull_request when: branch: - develop diff --git a/documentation20/cn/09.connections/docs.md b/documentation20/cn/09.connections/docs.md index 79380f3bbd9680120f63f89a0bfbe6f31f5c7a74..6a2ead37666d4b44e1f16f55e3e41694fee5f694 100644 --- a/documentation20/cn/09.connections/docs.md +++ b/documentation20/cn/09.connections/docs.md @@ -16,7 +16,7 @@ TDengine的Grafana插件在安装包的/usr/local/taos/connector/grafanaplugin 以CentOS 7.2操作系统为例,将grafanaplugin目录拷贝到/var/lib/grafana/plugins目录下,重新启动grafana即可。 ```bash -sudo cp -rf /usr/local/taos/connector/grafanaplugin /var/lib/grafana/tdengine +sudo cp -rf /usr/local/taos/connector/grafanaplugin /var/lib/grafana/plugins/tdengine ``` ### 使用 Grafana diff --git a/documentation20/cn/12.taos-sql/docs.md b/documentation20/cn/12.taos-sql/docs.md index 112ad99391521b31bdb4876519be6b68d8b62fe6..fbb82ee140bf99e5ef4d08c8c6a6420678803ffd 100644 --- a/documentation20/cn/12.taos-sql/docs.md +++ b/documentation20/cn/12.taos-sql/docs.md @@ -135,6 +135,14 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM SHOW DATABASES; ``` +- **显示一个数据库的创建语句** + + ```mysql + SHOW CREATE DATABASE db_name; + ``` + 常用于数据库迁移。对一个已经存在的数据库,返回其创建语句;在另一个集群中执行该语句,就能得到一个设置完全相同的 Database。 + + ## 表管理 - **创建数据表** @@ -200,6 +208,13 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM 通配符匹配:1)’%’ (百分号)匹配0到任意个字符;2)’\_’下划线匹配一个字符。 +- **显示一个数据表的创建语句** + + ```mysql + SHOW CREATE TABLE tb_name; + ``` + 常用于数据库迁移。对一个已经存在的数据表,返回其创建语句;在另一个集群中执行该语句,就能得到一个结构完全相同的数据表。 + - **在线修改显示字符宽度** ```mysql @@ -265,6 +280,13 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM ``` 查看数据库内全部 STable,及其相关信息,包括 STable 的名称、创建时间、列数量、标签(TAG)数量、通过该 STable 建表的数量。 +- **显示一个超级表的创建语句** + + ```mysql + SHOW CREATE STABLE stb_name; + ``` + 常用于数据库迁移。对一个已经存在的超级表,返回其创建语句;在另一个集群中执行该语句,就能得到一个结构完全相同的超级表。 + - **获取超级表的结构信息** ```mysql diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 0fad4f97f5056301893e35df91a32735c628c789..d96e25dd3799f24d5f846553b7e23161d7107419 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -577,12 +577,13 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SSq index = 0; sToken = tStrGetToken(*str, &index, false); - *str += index; if (sToken.n == 0 || sToken.type != TK_RP) { tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str); code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; - return -1; + return code; } + + *str += index; (*numOfRows)++; } @@ -712,6 +713,9 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, STableDataBlock int32_t numOfRows = 0; code = tsParseValues(str, dataBuf, maxNumOfRows, pCmd, &numOfRows, tmpTokenBuf); + if (code != TSDB_CODE_SUCCESS) { + return code; + } for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) { SParamInfo *param = dataBuf->params + i; diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBPreparedStatementTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBPreparedStatementTest.java index dc6fd4c5016bd67eb8e5dcf0e9d497f4f2f97612..8804cc5da0b6f2bcfb2b046bf794445221d61df2 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBPreparedStatementTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBPreparedStatementTest.java @@ -8,6 +8,8 @@ import org.junit.Test; import java.io.IOException; import java.io.Serializable; import java.sql.*; +import java.util.ArrayList; +import java.util.Random; public class TSDBPreparedStatementTest { private static final String host = "127.0.0.1"; @@ -97,6 +99,118 @@ public class TSDBPreparedStatementTest { Assert.assertEquals(1, result); } + @Test + public void executeTest() throws SQLException { + Statement stmt = conn.createStatement(); + + int numOfRows = 1000; + + for (int loop = 0; loop < 10; loop++){ + stmt.execute("drop table if exists weather_test"); + stmt.execute("create table weather_test(ts timestamp, f1 nchar(4), f2 float, f3 double, f4 timestamp, f5 int, f6 bool, f7 binary(10))"); + + TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? values(?, ?, ?, ?, ?, ?, ?, ?)"); + Random r = new Random(); + s.setTableName("weather_test"); + + ArrayList ts = new ArrayList(); + for(int i = 0; i < numOfRows; i++) { + ts.add(System.currentTimeMillis() + i); + } + s.setTimestamp(0, ts); + + int random = 10 + r.nextInt(5); + ArrayList s2 = new ArrayList(); + for(int i = 0; i < numOfRows; i++) { + if(i % random == 0) { + s2.add(null); + }else{ + s2.add("分支" + i % 4); + } + } + s.setNString(1, s2, 4); + + random = 10 + r.nextInt(5); + ArrayList s3 = new ArrayList(); + for(int i = 0; i < numOfRows; i++) { + if(i % random == 0) { + s3.add(null); + }else{ + s3.add(r.nextFloat()); + } + } + s.setFloat(2, s3); + + random = 10 + r.nextInt(5); + ArrayList s4 = new ArrayList(); + for(int i = 0; i < numOfRows; i++) { + if(i % random == 0) { + s4.add(null); + }else{ + s4.add(r.nextDouble()); + } + } + s.setDouble(3, s4); + + random = 10 + r.nextInt(5); + ArrayList ts2 = new ArrayList(); + for(int i = 0; i < numOfRows; i++) { + if(i % random == 0) { + ts2.add(null); + }else{ + ts2.add(System.currentTimeMillis() + i); + } + } + s.setTimestamp(4, ts2); + + random = 10 + r.nextInt(5); + ArrayList vals = new ArrayList<>(); + for(int i = 0; i < numOfRows; i++) { + if(i % random == 0) { + vals.add(null); + }else{ + vals.add(r.nextInt()); + } + } + s.setInt(5, vals); + + random = 10 + r.nextInt(5); + ArrayList sb = new ArrayList<>(); + for(int i = 0; i < numOfRows; i++) { + if(i % random == 0) { + sb.add(null); + }else{ + sb.add(i % 2 == 0 ? true : false); + } + } + s.setBoolean(6, sb); + + random = 10 + r.nextInt(5); + ArrayList s5 = new ArrayList(); + for(int i = 0; i < numOfRows; i++) { + if(i % random == 0) { + s5.add(null); + }else{ + s5.add("test" + i % 10); + } + } + s.setString(7, s5, 10); + + s.columnDataAddBatch(); + s.columnDataExecuteBatch(); + s.columnDataCloseBatch(); + + String sql = "select * from weather_test"; + PreparedStatement statement = conn.prepareStatement(sql); + ResultSet rs = statement.executeQuery(); + int rows = 0; + while(rs.next()) { + rows++; + } + Assert.assertEquals(numOfRows, rows); + } + } + @Test public void setBoolean() throws SQLException { pstmt_insert.setTimestamp(1, new Timestamp(System.currentTimeMillis())); diff --git a/src/tsdb/inc/tsdbBuffer.h b/src/tsdb/inc/tsdbBuffer.h index 414ace00097d95742080a8f173177d5e44497237..4e18ac711a159cd97fd6255c5be0e8aa6ff4abaf 100644 --- a/src/tsdb/inc/tsdbBuffer.h +++ b/src/tsdb/inc/tsdbBuffer.h @@ -28,8 +28,9 @@ typedef struct { int bufBlockSize; int tBufBlocks; int nBufBlocks; + int nRecycleBlocks; int64_t index; - SList* bufBlockList; + SList* bufBlockList; } STsdbBufPool; #define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold @@ -39,5 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool); int tsdbOpenBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); +int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); #endif /* _TD_TSDB_BUFFER_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 074ff20f2298918f1fa0698be0a291081ead8f05..4d62164df9920c916e7a01cd67496711707b76e1 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -71,6 +71,11 @@ struct STsdbRepo { uint8_t state; STsdbCfg config; + + STsdbCfg save_config; // save apply config + bool config_changed; // config changed flag + pthread_mutex_t save_mutex; // protect save config + STsdbAppH appH; STsdbStat stat; STsdbMeta* tsdbMeta; diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 1798a21b9963c7641dd99dc7fa11a5dd977e0e3c..429ea8e0ceb2c687b53ebd0a9d61d02d2a1f3686 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -70,6 +70,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { pPool->tBufBlocks = pCfg->totalBlocks; pPool->nBufBlocks = 0; pPool->index = 0; + pPool->nRecycleBlocks = 0; for (int i = 0; i < pCfg->totalBlocks; i++) { STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); @@ -156,4 +157,46 @@ _err: return NULL; } -static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } \ No newline at end of file +static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } + +int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { + if (oldTotalBlocks == pRepo->config.totalBlocks) { + return TSDB_CODE_SUCCESS; + } + + int err = TSDB_CODE_SUCCESS; + + if (tsdbLockRepo(pRepo) < 0) return terrno; + STsdbBufPool* pPool = pRepo->pPool; + + if (pRepo->config.totalBlocks > oldTotalBlocks) { + for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) { + STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); + if (pBufBlock == NULL) goto err; + + if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { + tsdbFreeBufBlock(pBufBlock); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto err; + } + + pPool->nBufBlocks++; + } + pthread_cond_signal(&pPool->poolNotEmpty); + } else { + pPool->nRecycleBlocks = oldTotalBlocks - pRepo->config.totalBlocks; + } + +err: + tsdbUnlockRepo(pRepo); + return err; +} + +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { + STsdbBufBlock *pBufBlock = NULL; + tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); + tsdbFreeBufBlock(pBufBlock); + free(pNode); + pPool->nBufBlocks--; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 9e8e4acd7ebea2209bf08798eb80f300a72927ab..e753a3211e5a789548ee6fc6f7caca6ccd3b59c3 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -112,6 +112,32 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) { return 0; } +static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { + pRepo->config_changed = false; + STsdbCfg * pSaveCfg = &pRepo->save_config; + + int32_t oldTotalBlocks = pRepo->config.totalBlocks; + + pRepo->config.compression = pRepo->save_config.compression; + pRepo->config.keep = pRepo->save_config.keep; + pRepo->config.keep1 = pRepo->save_config.keep1; + pRepo->config.keep2 = pRepo->save_config.keep2; + pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; + pRepo->config.totalBlocks = pRepo->save_config.totalBlocks; + + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)", + REPO_ID(pRepo), + pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, + pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks); + + int err = tsdbExpendPool(pRepo, oldTotalBlocks); + if (!TAOS_SUCCEEDED(err)) { + tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s", + REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err)); + } + +} + static void *tsdbLoopCommit(void *arg) { SCommitQueue *pQueue = &tsCommitQueue; SListNode * pNode = NULL; @@ -138,6 +164,13 @@ static void *tsdbLoopCommit(void *arg) { pRepo = ((SCommitReq *)pNode->data)->pRepo; + // check if need to apply new config + if (pRepo->config_changed) { + pthread_mutex_lock(&pRepo->save_mutex); + tsdbApplyRepoConfig(pRepo); + pthread_mutex_unlock(&pRepo->save_mutex); + } + tsdbCommitData(pRepo); listNodeFree(pNode); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 99929f3542160cc53b99571f73d699e1abcbf171..fd02a3c8b97d7506209d92661143a158d8d94951 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -203,6 +203,70 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { // TODO: think about multithread cases + if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; + + STsdbCfg * pRCfg = &repo->config; + + ASSERT(pRCfg->tsdbId == pCfg->tsdbId); + ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); + ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile); + ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock); + ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock); + ASSERT(pRCfg->precision == pCfg->precision); + + bool configChanged = false; + if (pRCfg->compression != pCfg->compression) { + configChanged = true; + } + if (pRCfg->keep != pCfg->keep) { + configChanged = true; + } + if (pRCfg->keep1 != pCfg->keep1) { + configChanged = true; + } + if (pRCfg->keep2 != pCfg->keep2) { + configChanged = true; + } + if (pRCfg->cacheLastRow != pCfg->cacheLastRow) { + configChanged = true; + } + if (pRCfg->totalBlocks != pCfg->totalBlocks) { + configChanged = true; + } + + if (!configChanged) { + tsdbError("vgId:%d no config changed", REPO_ID(repo)); + } + + int code = pthread_mutex_lock(&repo->save_mutex); + if (code != 0) { + tsdbError("vgId:%d failed to lock tsdb save config mutex since %s", REPO_ID(repo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + + STsdbCfg * pSaveCfg = &repo->save_config; + *pSaveCfg = repo->config; + + pSaveCfg->compression = pCfg->compression; + pSaveCfg->keep = pCfg->keep; + pSaveCfg->keep1 = pCfg->keep1; + pSaveCfg->keep2 = pCfg->keep2; + pSaveCfg->cacheLastRow = pCfg->cacheLastRow; + pSaveCfg->totalBlocks = pCfg->totalBlocks; + + tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", + REPO_ID(repo), + pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2, + pRCfg->cacheLastRow, pRCfg->totalBlocks); + tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", + REPO_ID(repo), + pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, + pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks); + + repo->config_changed = true; + + pthread_mutex_unlock(&repo->save_mutex); return 0; #if 0 STsdbRepo *pRepo = (STsdbRepo *)repo; @@ -474,6 +538,14 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } + code = pthread_mutex_init(&(pRepo->save_mutex), NULL); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + tsdbFreeRepo(pRepo); + return NULL; + } + pRepo->config_changed = false; + code = tsem_init(&(pRepo->readyToCommit), 0, 1); if (code != 0) { code = errno; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 20ec426018a39e554fb03e9bb11399dbce1f3fcc..c6fcf55686850432a90d82e1f50c856f29c8eac0 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -98,17 +98,26 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { STsdbBufPool *pBufPool = pRepo->pPool; SListNode *pNode = NULL; + bool recycleBlocks = pBufPool->nRecycleBlocks > 0; if (tsdbLockRepo(pRepo) < 0) return -1; while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { - tdListAppendNode(pBufPool->bufBlockList, pNode); - } - int code = pthread_cond_signal(&pBufPool->poolNotEmpty); - if (code != 0) { - if (tsdbUnlockRepo(pRepo) < 0) return -1; - tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code)); - terrno = TAOS_SYSTEM_ERROR(code); - return -1; + if (pBufPool->nRecycleBlocks > 0) { + tsdbRecycleBufferBlock(pBufPool, pNode); + pBufPool->nRecycleBlocks -= 1; + } else { + tdListAppendNode(pBufPool->bufBlockList, pNode); + } + } + if (!recycleBlocks) { + int code = pthread_cond_signal(&pBufPool->poolNotEmpty); + if (code != 0) { + if (tsdbUnlockRepo(pRepo) < 0) return -1; + tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } } + if (tsdbUnlockRepo(pRepo) < 0) return -1; for (int i = 0; i < pMemTable->maxTables; i++) { @@ -958,6 +967,15 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) { STsdbCfg *pCfg = &pRepo->config; + // if cacheLastRow config has been reset, free the lastRow + if (!pCfg->cacheLastRow && pTable->lastRow != NULL) { + taosTZfree(pTable->lastRow); + TSDB_WLOCK_TABLE(pTable); + pTable->lastRow = NULL; + pTable->lastKey = TSKEY_INITIAL_VAL; + TSDB_WUNLOCK_TABLE(pTable); + } + if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) { if (pCfg->cacheLastRow || pTable->lastRow != NULL) { SDataRow nrow = pTable->lastRow; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 0e47996c6f48c9ab1bd75eb7b08cbbee6337a913..ca3746c5989b4928e57e096e5236dad3039d709d 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -170,29 +170,31 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) { vDebug("vgId:%d, tsdbchanged:%d syncchanged:%d while alter vnode", pVnode->vgId, tsdbCfgChanged, syncCfgChanged); - if (/*tsdbCfgChanged || */syncCfgChanged) { + if (tsdbCfgChanged || syncCfgChanged) { // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // dbCfgVersion can be corrected by status msg - if (!vnodeSetUpdatingStatus(pVnode)) { - vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); - pVnode->dbCfgVersion = dbCfgVersion; - pVnode->vgCfgVersion = vgCfgVersion; - pVnode->syncCfg = syncCfg; - pVnode->tsdbCfg = tsdbCfg; - return TSDB_CODE_SUCCESS; - } + if (syncCfgChanged) { + if (!vnodeSetUpdatingStatus(pVnode)) { + vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); + pVnode->dbCfgVersion = dbCfgVersion; + pVnode->vgCfgVersion = vgCfgVersion; + pVnode->syncCfg = syncCfg; + pVnode->tsdbCfg = tsdbCfg; + return TSDB_CODE_SUCCESS; + } - code = syncReconfig(pVnode->sync, &pVnode->syncCfg); - if (code != TSDB_CODE_SUCCESS) { - pVnode->dbCfgVersion = dbCfgVersion; - pVnode->vgCfgVersion = vgCfgVersion; - pVnode->syncCfg = syncCfg; - pVnode->tsdbCfg = tsdbCfg; - vnodeSetReadyStatus(pVnode); - return code; + code = syncReconfig(pVnode->sync, &pVnode->syncCfg); + if (code != TSDB_CODE_SUCCESS) { + pVnode->dbCfgVersion = dbCfgVersion; + pVnode->vgCfgVersion = vgCfgVersion; + pVnode->syncCfg = syncCfg; + pVnode->tsdbCfg = tsdbCfg; + vnodeSetReadyStatus(pVnode); + return code; + } } - if (pVnode->tsdb) { + if (tsdbCfgChanged && pVnode->tsdb) { code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); if (code != TSDB_CODE_SUCCESS) { pVnode->dbCfgVersion = dbCfgVersion; diff --git a/tests/script/api/makefile b/tests/script/api/makefile index f3a8dec061d1f2d4bc5342e917e72d06b138ad84..5eeb1342887afc2c4f920aa673466eef7f7ae510 100644 --- a/tests/script/api/makefile +++ b/tests/script/api/makefile @@ -11,7 +11,7 @@ CFLAGS = -O0 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion \ all: $(TARGET) exe: - gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS) + gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS) gcc $(CFLAGS) ./stmtBatchTest.c -o $(ROOT)stmtBatchTest $(LFLAGS) clean: diff --git a/tests/script/api/stmtBatchTest.c b/tests/script/api/stmtBatchTest.c index cb1ca3346bf34a72ee6dc8d73e7979795bf094a2..3ce2fefb0d9361494e3019d2d7809b004d9f1489 100644 --- a/tests/script/api/stmtBatchTest.c +++ b/tests/script/api/stmtBatchTest.c @@ -1409,20 +1409,14 @@ static void prepareV(TAOS *taos, int schemaCase, int tableNum, int lenOfBina } -static void preparemV(TAOS *taos, int schemaCase, int idx) { +static void prepareV_long(TAOS *taos, int schemaCase, int tableNum, int lenOfBinaryDef) { TAOS_RES *result; int code; - char dbname[32],sql[255]; - sprintf(dbname, "demo%d", idx); - sprintf(sql, "drop database if exists %s", dbname); - - - result = taos_query(taos, sql); + result = taos_query(taos, "drop database if exists demol"); taos_free_result(result); - sprintf(sql, "create database %s", dbname); - result = taos_query(taos, sql); + result = taos_query(taos, "create database demol"); code = taos_errno(result); if (code != 0) { printf("failed to create database, reason:%s\n", taos_errstr(result)); @@ -1431,18 +1425,18 @@ static void preparemV(TAOS *taos, int schemaCase, int idx) { } taos_free_result(result); - sprintf(sql, "use %s", dbname); - result = taos_query(taos, sql); + result = taos_query(taos, "use demol"); taos_free_result(result); // create table - for (int i = 0 ; i < 300; i++) { + for (int i = 0 ; i < tableNum; i++) { char buf[1024]; if (schemaCase) { - sprintf(buf, "create table m%d (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), bin2 binary(40), t2 timestamp)", i) ; + sprintf(buf, "create table m%d (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, br binary(%d), nr nchar(%d), ts2 timestamp)", i, lenOfBinaryDef, lenOfBinaryDef) ; } else { sprintf(buf, "create table m%d (ts timestamp, b int)", i) ; } + result = taos_query(taos, buf); code = taos_errno(result); if (code != 0) { @@ -1985,12 +1979,283 @@ static void* runCase(void *para) { } + +static int stmt_bind_case_001_long(TAOS_STMT *stmt, int tableNum, int rowsOfPerColum, int bingNum, int lenOfBinaryDef, int lenOfBinaryAct, int columnNum, int64_t* startTs) { + sampleValue* v = (sampleValue *)calloc(1, sizeof(sampleValue)); + + int totalRowsPerTbl = rowsOfPerColum * bingNum; + + v->ts = (int64_t *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * tableNum)); + v->br = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef)); + v->nr = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef)); + + int *lb = (int *)malloc(MAX_ROWS_OF_PER_COLUMN * sizeof(int)); + + TAOS_MULTI_BIND *params = calloc(1, sizeof(TAOS_MULTI_BIND) * (size_t)(bingNum * columnNum * (tableNum+1) * rowsOfPerColum)); + char* is_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN); + char* no_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN); + + int64_t tts = *startTs; + + for (int i = 0; i < rowsOfPerColum; ++i) { + lb[i] = lenOfBinaryAct; + no_null[i] = 0; + is_null[i] = (i % 10 == 2) ? 1 : 0; + v->b[i] = (int8_t)(i % 2); + v->v1[i] = (int8_t)((i+1) % 2); + v->v2[i] = (int16_t)i; + v->v4[i] = (int32_t)(i+1); + v->v8[i] = (int64_t)(i+2); + v->f4[i] = (float)(i+3); + v->f8[i] = (double)(i+4); + char tbuf[MAX_BINARY_DEF_LEN]; + memset(tbuf, 0, MAX_BINARY_DEF_LEN); + sprintf(tbuf, "binary-%d", i%10); + memcpy(v->br + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct); + memset(tbuf, 0, MAX_BINARY_DEF_LEN); + sprintf(tbuf, "nchar-%d", i%10); + memcpy(v->nr + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct); + v->ts2[i] = tts + i; + } + + int i = 0; + for (int j = 0; j < bingNum * tableNum; j++) { + params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[i+0].buffer_length = sizeof(int64_t); + params[i+0].buffer = &v->ts[j*rowsOfPerColum]; + params[i+0].length = NULL; + params[i+0].is_null = no_null; + params[i+0].num = rowsOfPerColum; + + params[i+1].buffer_type = TSDB_DATA_TYPE_BOOL; + params[i+1].buffer_length = sizeof(int8_t); + params[i+1].buffer = v->b; + params[i+1].length = NULL; + params[i+1].is_null = is_null; + params[i+1].num = rowsOfPerColum; + + params[i+2].buffer_type = TSDB_DATA_TYPE_TINYINT; + params[i+2].buffer_length = sizeof(int8_t); + params[i+2].buffer = v->v1; + params[i+2].length = NULL; + params[i+2].is_null = is_null; + params[i+2].num = rowsOfPerColum; + + params[i+3].buffer_type = TSDB_DATA_TYPE_SMALLINT; + params[i+3].buffer_length = sizeof(int16_t); + params[i+3].buffer = v->v2; + params[i+3].length = NULL; + params[i+3].is_null = is_null; + params[i+3].num = rowsOfPerColum; + + params[i+4].buffer_type = TSDB_DATA_TYPE_INT; + params[i+4].buffer_length = sizeof(int32_t); + params[i+4].buffer = v->v4; + params[i+4].length = NULL; + params[i+4].is_null = is_null; + params[i+4].num = rowsOfPerColum; + + params[i+5].buffer_type = TSDB_DATA_TYPE_BIGINT; + params[i+5].buffer_length = sizeof(int64_t); + params[i+5].buffer = v->v8; + params[i+5].length = NULL; + params[i+5].is_null = is_null; + params[i+5].num = rowsOfPerColum; + + params[i+6].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[i+6].buffer_length = sizeof(float); + params[i+6].buffer = v->f4; + params[i+6].length = NULL; + params[i+6].is_null = is_null; + params[i+6].num = rowsOfPerColum; + + params[i+7].buffer_type = TSDB_DATA_TYPE_DOUBLE; + params[i+7].buffer_length = sizeof(double); + params[i+7].buffer = v->f8; + params[i+7].length = NULL; + params[i+7].is_null = is_null; + params[i+7].num = rowsOfPerColum; + + params[i+8].buffer_type = TSDB_DATA_TYPE_BINARY; + params[i+8].buffer_length = (uintptr_t)lenOfBinaryDef; + params[i+8].buffer = v->br; + params[i+8].length = lb; + params[i+8].is_null = is_null; + params[i+8].num = rowsOfPerColum; + + params[i+9].buffer_type = TSDB_DATA_TYPE_NCHAR; + params[i+9].buffer_length = (uintptr_t)lenOfBinaryDef; + params[i+9].buffer = v->nr; + params[i+9].length = lb; + params[i+9].is_null = is_null; + params[i+9].num = rowsOfPerColum; + + params[i+10].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[i+10].buffer_length = sizeof(int64_t); + params[i+10].buffer = v->ts2; + params[i+10].length = NULL; + params[i+10].is_null = is_null; + params[i+10].num = rowsOfPerColum; + + i+=columnNum; + } + + //int64_t tts = 1591060628000; + for (int i = 0; i < totalRowsPerTbl * tableNum; ++i) { + v->ts[i] = tts + i; + } + + *startTs = tts + totalRowsPerTbl * tableNum; // return to next + + unsigned long long starttime = getCurrentTime(); + + char *sql = "insert into ? values(?,?,?,?,?,?,?,?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + + int id = 0; + for (int l = 0; l < bingNum; l++) { + for (int zz = 0; zz < tableNum; zz++) { + char buf[32]; + sprintf(buf, "m%d", zz); + code = taos_stmt_set_tbname(stmt, buf); + if (code != 0){ + printf("failed to execute taos_stmt_set_tbname. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + + for (int col=0; col < columnNum; ++col) { + code = taos_stmt_bind_single_param_batch(stmt, params + id, col); + if (code != 0){ + printf("failed to execute taos_stmt_bind_single_param_batch. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + id++; + } + + code = taos_stmt_add_batch(stmt); + if (code != 0) { + printf("failed to execute taos_stmt_add_batch. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + } + + code = taos_stmt_execute(stmt); + if (code != 0) { + printf("failed to execute taos_stmt_execute. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + } + + unsigned long long endtime = getCurrentTime(); + unsigned long long totalRows = (uint32_t)(totalRowsPerTbl * tableNum); + printf("insert total %d records, used %u seconds, avg:%u useconds per record\n", totalRows, (endtime-starttime)/1000000UL, (endtime-starttime)/totalRows); + + free(v->ts); + free(v->br); + free(v->nr); + free(v); + free(lb); + free(params); + free(is_null); + free(no_null); + + return 0; +} + +static void* runCase_long(void *para) { + ThreadInfo* tInfo = (ThreadInfo *)para; + TAOS *taos = tInfo->taos; + int idx = tInfo->idx; + + TAOS_STMT *stmt = NULL; + + (void)idx; + + int tableNum; + int lenOfBinaryDef; + int rowsOfPerColum; + int bingNum; + int lenOfBinaryAct; + int columnNum; + + int totalRowsPerTbl; + +//=======================================================================// + //========== long case 14: ======================// +#if 0 + { + stmt = taos_stmt_init(taos); + + tableNum = 1000; + rowsOfPerColum = 10; + bingNum = 5000000; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 33; + columnNum = 11; + + prepareV(taos, 1, tableNum, lenOfBinaryDef); + stmt_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m1", 0, totalRowsPerTbl); + checkResult(taos, "m2", 0, totalRowsPerTbl); + checkResult(taos, "m3", 0, totalRowsPerTbl); + checkResult(taos, "m4", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("long case 14 check result end\n\n"); + } +#endif + + + //========== case 15: ======================// +#if 1 + { + printf("====long case 15 test start\n\n"); + + tableNum = 200; + rowsOfPerColum = 110; + bingNum = 100; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 8; + columnNum = 11; + + int64_t startTs = 1591060628000; + prepareV_long(taos, 1, tableNum, lenOfBinaryDef); + + totalRowsPerTbl = 0; + for (int i = 0; i < 30000; i++) { + stmt = taos_stmt_init(taos); + stmt_bind_case_001_long(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum, &startTs); + + totalRowsPerTbl += rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m11", 0, totalRowsPerTbl); + checkResult(taos, "m22", 0, totalRowsPerTbl); + checkResult(taos, "m133", 0, totalRowsPerTbl); + checkResult(taos, "m199", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + } + + printf("====long case 15 check result end\n\n"); + } +#endif + + return NULL; + +} + + int main(int argc, char *argv[]) { TAOS *taos; char host[32] = "127.0.0.1"; char* serverIp = NULL; - int threadNum = 1; + int threadNum = 2; // connect to server if (argc == 1) { @@ -2021,7 +2286,11 @@ int main(int argc, char *argv[]) tInfo->taos = taos; tInfo->idx = i; - pthread_create(&(pThreadList[0]), NULL, runCase, (void *)tInfo); + if (0 == i) { + pthread_create(&(pThreadList[0]), NULL, runCase, (void *)tInfo); + } else if (1 == i){ + pthread_create(&(pThreadList[0]), NULL, runCase_long, (void *)tInfo); + } tInfo++; }