diff --git a/documentation20/cn/02.getting-started/docs.md b/documentation20/cn/02.getting-started/docs.md index a98159d8c408c7140b1a24f2534e575b38d1c56a..6eb58a1433ed0d43b313a9dc979ae5873ba00e8f 100644 --- a/documentation20/cn/02.getting-started/docs.md +++ b/documentation20/cn/02.getting-started/docs.md @@ -24,7 +24,7 @@ TDengine的安装非常简单,从下载到安装成功仅仅只要几秒钟。 ## 轻松启动 -安装成功后,用户可使用`systemctl`命令来启动TDengine的服务进程。 +安装成功后,用户可使用 `systemctl` 命令来启动 TDengine 的服务进程。 ```bash $ systemctl start taosd @@ -35,21 +35,22 @@ $ systemctl start taosd $ systemctl status taosd ``` -如果TDengine服务正常工作,那么您可以通过TDengine的命令行程序`taos`来访问并体验TDengine。 +如果 TDengine 服务正常工作,那么您可以通过 TDengine 的命令行程序 `taos` 来访问并体验 TDengine。 **注意:** -- systemctl命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo -- 为更好的获得产品反馈,改善产品,TDengine会采集基本的使用信息,但您可以修改系统配置文件taos.cfg里的配置参数telemetryReporting, 将其设为0,就可将其关闭。 -- TDengine采用FQDN(一般就是hostname)作为节点的ID,为保证正常运行,需要给运行taosd的服务器配置好hostname,在客户端应用运行的机器配置好DNS服务或hosts文件,保证FQDN能够解析。 +- systemctl 命令需要 _root_ 权限来运行,如果您非 _root_ 用户,请在命令前添加 sudo 。 +- 为更好的获得产品反馈,改善产品,TDengine 会采集基本的使用信息,但您可以修改系统配置文件 taos.cfg 里的配置参数 telemetryReporting, 将其设为 0,就可将其关闭。 +- TDengine 采用 FQDN (一般就是 hostname )作为节点的 ID,为保证正常运行,需要给运行 taosd 的服务器配置好 hostname,在客户端应用运行的机器配置好 DNS 服务或 hosts 文件,保证 FQDN 能够解析。 +- `systemctl stop taosd` 指令在执行后并不会马上停止 TDengine 服务,而是会等待系统中必要的落盘工作正常完成。在数据量很大的情况下,这可能会消耗较长时间。 -* TDengine 支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装,用`which systemctl`命令来检测系统中是否存在`systemd`包: +* TDengine 支持在使用 [`systemd`](https://en.wikipedia.org/wiki/Systemd) 做进程服务管理的 linux 系统上安装,用 `which systemctl` 命令来检测系统中是否存在 `systemd` 包: ```bash $ which systemctl ``` - 如果系统中不支持systemd,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。 + 如果系统中不支持 systemd,也可以用手动运行 /usr/local/taos/bin/taosd 方式启动 TDengine 服务。 ## TDengine命令行程序 diff --git a/documentation20/cn/11.administrator/docs.md b/documentation20/cn/11.administrator/docs.md index bfa0456c7d4e80d1fd9336d7c4b7b9ca829b278e..ee2f34ff11ef5e69195c01d0b605383c3257f48c 100644 --- a/documentation20/cn/11.administrator/docs.md +++ b/documentation20/cn/11.administrator/docs.md @@ -129,7 +129,7 @@ taosd -C - blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改) - replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改) - precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。 -- cacheLast:是否在内存中缓存子表 last_row,0:关闭;1:开启。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) +- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,3:同时打开缓存最近行和列功能,默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数) 对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL: diff --git a/documentation20/cn/12.taos-sql/docs.md b/documentation20/cn/12.taos-sql/docs.md index bcf80d8fa20e613ff8955afa13036687da9d8a59..b130e0544caae9a0e95f3a3f405498feb0a1075b 100644 --- a/documentation20/cn/12.taos-sql/docs.md +++ b/documentation20/cn/12.taos-sql/docs.md @@ -126,7 +126,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM ```mysql ALTER DATABASE db_name CACHELAST 0; ``` - CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0,取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11 版本开始支持,修改后需要重启服务器生效。) + CACHELAST 参数控制是否在内存中缓存数据子表的 last_row。缺省值为 0,取值范围 [0, 1]。其中 0 表示不启用、1 表示启用。(从 2.0.11.0 版本开始支持。从 2.1.1.0 版本开始,修改此参数后无需重启服务器即可生效。) **Tips**: 以上所有参数修改后都可以用show databases来确认是否修改成功。 @@ -399,7 +399,12 @@ TDengine 缺省的时间戳是毫秒精度,但通过修改配置参数 enableM INSERT INTO tb1_name (tb1_field1_name, ...) [USING stb1_name TAGS (tag_value1, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ... tb2_name (tb2_field1_name, ...) [USING stb2_name TAGS (tag_value2, ...)] VALUES (field1_value1, ...) (field1_value2, ...) ...; ``` - 以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。 + 以自动建表的方式,同时向表tb1_name和tb2_name中按列分别插入多条记录。 + 从 2.0.20.5 版本开始,子表的列名可以不跟在子表名称后面,而是可以放在 TAGS 和 VALUES 之间,例如像下面这样写: + ```mysql + INSERT INTO tb1_name [USING stb1_name TAGS (tag_value1, ...)] (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...) ...; + ``` + 注意:虽然两种写法都可以,但并不能在一条 SQL 语句中混用,否则会报语法错误。 **历史记录写入**:可使用IMPORT或者INSERT命令,IMPORT的语法,功能与INSERT完全一样。 diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 450452fa4ae25507617c4901f3c6fff242079ca1..8ee73291566afcbf604b607c0bc6a426ca372b87 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -234,6 +234,7 @@ typedef struct SDataCol { int len; // column data length VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column void * pData; // Actual data pointer + TSKEY ts; // only used in last NULL column } SDataCol; static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index e596ee67ecf1ab736246056d4e510661eee677fe..f888d037b39f944fb15b7a7d24d11952a4e3ef1e 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -298,7 +298,7 @@ do { \ #define TSDB_DEFAULT_DB_UPDATE_OPTION 0 #define TSDB_MIN_DB_CACHE_LAST_ROW 0 -#define TSDB_MAX_DB_CACHE_LAST_ROW 1 +#define TSDB_MAX_DB_CACHE_LAST_ROW 3 #define TSDB_DEFAULT_CACHE_LAST_ROW 0 #define TSDB_MIN_FSYNC_PERIOD 0 diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index c58c64073db669758a6716f8579702801892c99e..43e1ac2a19447fdf771b31d3dae76aa4f83442d3 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -69,9 +69,13 @@ typedef struct { int8_t precision; int8_t compression; int8_t update; - int8_t cacheLastRow; + int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2 } STsdbCfg; +#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) +#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0) +#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0) + // --------- TSDB REPOSITORY USAGE STATISTICS typedef struct { int64_t totalStorage; // total bytes occupie diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 7484071ce3c3bdb9e5aad449f22fc36d1809271a..45bbd5a7c6911fed4ea7309a77d3ac144109a34b 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -36,6 +36,12 @@ typedef struct STable { char* sql; void* cqhandle; SRWLatch latch; // TODO: implementa latch functions + + SDataCol *lastCols; + int16_t maxColNum; + int16_t restoreColumnNum; + bool hasRestoreLastColumn; + int lastColSVersion; T_REF_DECLARE() } STable; @@ -78,6 +84,11 @@ void tsdbUnRefTable(STable* pTable); void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen); void tsdbOrgMeta(STsdbRepo* pRepo); +int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema); +int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId); +int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema); +STSchema* tsdbGetTableLatestSchema(STable *pTable); +void tsdbFreeLastColumns(STable* pTable); static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index d760dc08a26854cae3ddcce5f8ad6d23899f6f46..e148e87c18dde9536357ee36cdca92b6562f424e 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -77,6 +77,9 @@ struct STsdbRepo { STsdbCfg save_config; // save apply config bool config_changed; // config changed flag pthread_mutex_t save_mutex; // protect save config + + uint8_t hasCachedLastRow; + uint8_t hasCachedLastColumn; STsdbAppH appH; STsdbStat stat; @@ -102,6 +105,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo); +int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 9257f382e74c6bf1663ec7c8cb8bcdc721ca3031..c65272c17e74471f28fc2f0ec16fd3e5db23deaa 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -88,6 +88,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt static int tsdbApplyRtn(STsdbRepo *pRepo); void *tsdbCommitData(STsdbRepo *pRepo) { + if (pRepo->imem == NULL) { + return NULL; + } tsdbStartCommit(pRepo); // Commit to update meta file diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 2543ad2ad1b68636271e2b27368a63cefc2ec53f..bb844e8e83f532e2c933ae35063460ec59129ee3 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -115,11 +115,15 @@ int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) { } static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { + pthread_mutex_lock(&pRepo->save_mutex); + pRepo->config_changed = false; STsdbCfg * pSaveCfg = &pRepo->save_config; - + STsdbCfg oldCfg; int32_t oldTotalBlocks = pRepo->config.totalBlocks; + memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg)); + pRepo->config.compression = pRepo->save_config.compression; pRepo->config.keep = pRepo->save_config.keep; pRepo->config.keep1 = pRepo->save_config.keep1; @@ -127,10 +131,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; pRepo->config.totalBlocks = pRepo->save_config.totalBlocks; - tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)", + pthread_mutex_unlock(&pRepo->save_mutex); + + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)", REPO_ID(pRepo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks); + pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks); int err = tsdbExpendPool(pRepo, oldTotalBlocks); if (!TAOS_SUCCEEDED(err)) { @@ -138,6 +144,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err)); } + if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) { + if (tsdbLockRepo(pRepo) < 0) return; + tsdbCacheLastData(pRepo, &oldCfg); + tsdbUnlockRepo(pRepo); + } + } static void *tsdbLoopCommit(void *arg) { @@ -168,10 +180,9 @@ static void *tsdbLoopCommit(void *arg) { req = ((SReq *)pNode->data)->req; pRepo = ((SReq *)pNode->data)->pRepo; + // check if need to apply new config if (pRepo->config_changed) { - pthread_mutex_lock(&pRepo->save_mutex); tsdbApplyRepoConfig(pRepo); - pthread_mutex_unlock(&pRepo->save_mutex); } if (req == COMMIT_REQ) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index fd02a3c8b97d7506209d92661143a158d8d94951..afbedd5b2fd231606902db104916e4ff4f10ba67 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -26,6 +26,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); static void tsdbFreeRepo(STsdbRepo *pRepo); static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo); +static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh); +static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx); // Function declaration int32_t tsdbCreateRepo(int repoid) { @@ -267,6 +269,10 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { repo->config_changed = true; pthread_mutex_unlock(&repo->save_mutex); + + // schedule a commit msg then the new config will be applied immediatly + tsdbAsyncCommit(repo); + return 0; #if 0 STsdbRepo *pRepo = (STsdbRepo *)repo; @@ -511,8 +517,10 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { if (pCfg->update != 0) pCfg->update = 1; // update cacheLastRow - if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1; - + if (pCfg->cacheLastRow != 0) { + if (pCfg->cacheLastRow > 3) + pCfg->cacheLastRow = 1; + } return 0; } @@ -545,6 +553,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } pRepo->config_changed = false; + atomic_store_8(&pRepo->hasCachedLastRow, 0); + atomic_store_8(&pRepo->hasCachedLastColumn, 0); code = tsem_init(&(pRepo->readyToCommit), 0, 1); if (code != 0) { @@ -614,13 +624,180 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } } +static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { + //tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data); + + STSchema *pSchema = tsdbGetTableLatestSchema(pTable); + if (pSchema == NULL) { + tsdbError("tsdbGetTableLatestSchema of table %s fail", pTable->name->data); + return 0; + } + + SBlock* pBlock; + int numColumns; + int32_t blockIdx; + SDataStatis* pBlockStatis = NULL; + SDataRow row = NULL; + // restore last column data with last schema + + int err = 0; + + numColumns = schemaNCols(pSchema); + if (numColumns <= pTable->restoreColumnNum) { + pTable->hasRestoreLastColumn = true; + return 0; + } + if (pTable->lastColSVersion != schemaVersion(pSchema)) { + if (tsdbInitColIdCacheWithSchema(pTable, pSchema) < 0) { + return -1; + } + } + + row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (row == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = -1; + goto out; + } + tdInitDataRow(row, pSchema); + + // first load block index info + if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + err = -1; + goto out; + } + + pBlockStatis = calloc(numColumns, sizeof(SDataStatis)); + if (pBlockStatis == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + err = -1; + goto out; + } + memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis)); + for(int32_t i = 0; i < numColumns; ++i) { + STColumn *pCol = schemaColAt(pSchema, i); + pBlockStatis[i].colId = pCol->colId; + } + + // load block from backward + SBlockIdx *pIdx = pReadh->pBlkIdx; + blockIdx = (int32_t)(pIdx->numOfBlocks - 1); + + while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) { + bool loadStatisData = false; + pBlock = pReadh->pBlkInfo->blocks + blockIdx; + blockIdx -= 1; + + // load block data + if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) { + err = -1; + goto out; + } + + // file block with sub-blocks has no statistics data + if (pBlock->numOfSubBlocks <= 1) { + tsdbLoadBlockStatis(pReadh, pBlock); + tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns); + loadStatisData = true; + } + + for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { + STColumn *pCol = schemaColAt(pSchema, i); + // ignore loaded columns + if (pTable->lastCols[i].bytes != 0) { + continue; + } + + // ignore block which has no not-null colId column + if (loadStatisData && pBlockStatis[i].numOfNull == pBlock->numOfRows) { + continue; + } + + // OK,let's load row from backward to get not-null column + for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { + SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; + tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + //SDataCol *pDataCol = readh.pDCols[0]->cols + j; + void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); + if (isNull(value, pCol->type)) { + continue; + } + + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); + if (idx == -1) { + tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId); + continue; + } + // save not-null column + SDataCol *pLastCol = &(pTable->lastCols[idx]); + pLastCol->pData = malloc(pCol->bytes); + pLastCol->bytes = pCol->bytes; + pLastCol->colId = pCol->colId; + memcpy(pLastCol->pData, value, pCol->bytes); + + // save row ts(in column 0) + pDataCol = pReadh->pDCols[0]->cols + 0; + pCol = schemaColAt(pSchema, 0); + tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + pLastCol->ts = dataRowKey(row); + + pTable->restoreColumnNum += 1; + + tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); + break; + } + } + } + +out: + taosTZfree(row); + tfree(pBlockStatis); + + if (err == 0 && numColumns <= pTable->restoreColumnNum) { + pTable->hasRestoreLastColumn = true; + } + + return err; +} + +static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { + ASSERT(pTable->lastRow == NULL); + if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + return -1; + } + + SBlock* pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1; + + if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) { + return -1; + } + + // Get the data in row + + STSchema *pSchema = tsdbGetTableSchema(pTable); + pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (pTable->lastRow == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + tdInitDataRow(pTable->lastRow, pSchema); + for (int icol = 0; icol < schemaNCols(pSchema); icol++) { + STColumn *pCol = schemaColAt(pSchema, icol); + SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; + tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, + pCol->offset); + } + + return 0; +} + int tsdbRestoreInfo(STsdbRepo *pRepo) { SFSIter fsiter; SReadH readh; SDFileSet *pSet; STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlock * pBlock; if (tsdbInitReadH(&readh, pRepo) < 0) { return -1; @@ -628,6 +805,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD); + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + for (int i = 1; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + pTable->restoreColumnNum = 0; + } + } + while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) { if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) { tsdbDestroyReadH(&readh); @@ -643,6 +828,8 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; + //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); + if (tsdbSetReadTable(&readh, pTable) < 0) { tsdbDestroyReadH(&readh); return -1; @@ -653,42 +840,155 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { if (pIdx && lastKey < pIdx->maxKey) { pTable->lastKey = pIdx->maxKey; - if (pCfg->cacheLastRow) { - if (tsdbLoadBlockInfo(&readh, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; - - if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { - tsdbDestroyReadH(&readh); - return -1; - } - - // Get the data in row - ASSERT(pTable->lastRow == NULL); - STSchema *pSchema = tsdbGetTableSchema(pTable); - pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); - if (pTable->lastRow == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyReadH(&readh); - return -1; - } - - tdInitDataRow(pTable->lastRow, pSchema); - for (int icol = 0; icol < schemaNCols(pSchema); icol++) { - STColumn *pCol = schemaColAt(pSchema, icol); - SDataCol *pDataCol = readh.pDCols[0]->cols + icol; - tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, - pCol->offset); - } + if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { + tsdbDestroyReadH(&readh); + return -1; } } + // restore NULL columns + if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg) && !pTable->hasRestoreLastColumn) { + if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { + tsdbDestroyReadH(&readh); + return -1; + } + } } } tsdbDestroyReadH(&readh); + if (CACHE_LAST_ROW(pCfg)) { + atomic_store_8(&pRepo->hasCachedLastRow, 1); + } + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + atomic_store_8(&pRepo->hasCachedLastColumn, 1); + } + return 0; } + +int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { + bool cacheLastRow = false, cacheLastCol = false; + SFSIter fsiter; + SReadH readh; + SDFileSet *pSet; + STsdbMeta *pMeta = pRepo->tsdbMeta; + int tableNum = 0; + int maxTableIdx = 0; + int cacheLastRowTableNum = 0; + int cacheLastColTableNum = 0; + + bool need_free_last_row = CACHE_LAST_ROW(oldCfg) && !CACHE_LAST_ROW(&(pRepo->config)); + bool need_free_last_col = CACHE_LAST_NULL_COLUMN(oldCfg) && !CACHE_LAST_NULL_COLUMN(&(pRepo->config)); + + if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) { + tsdbInfo("tsdbCacheLastData cache last data since cacheLast option changed"); + cacheLastRow = !CACHE_LAST_ROW(oldCfg) && CACHE_LAST_ROW(&(pRepo->config)); + cacheLastCol = !CACHE_LAST_NULL_COLUMN(oldCfg) && CACHE_LAST_NULL_COLUMN(&(pRepo->config)); + } + + // calc max table idx and table num + for (int i = 1; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + tableNum += 1; + maxTableIdx = i; + if (cacheLastCol) { + pTable->restoreColumnNum = 0; + } + } + + // if close last option,need to free data + if (need_free_last_row || need_free_last_col) { + if (need_free_last_row) { + atomic_store_8(&pRepo->hasCachedLastRow, 0); + } + if (need_free_last_col) { + atomic_store_8(&pRepo->hasCachedLastColumn, 0); + } + tsdbInfo("free cache last data since cacheLast option changed"); + for (int i = 1; i < maxTableIdx; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + if (need_free_last_row) { + taosTZfree(pTable->lastRow); + pTable->lastRow = NULL; + pTable->lastKey = TSKEY_INITIAL_VAL; + } + if (need_free_last_col) { + tsdbFreeLastColumns(pTable); + } + } + } + + if (!cacheLastRow && !cacheLastCol) { + return 0; + } + + cacheLastRowTableNum = cacheLastRow ? tableNum : 0; + cacheLastColTableNum = cacheLastCol ? tableNum : 0; + + if (tsdbInitReadH(&readh, pRepo) < 0) { + return -1; + } + + tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD); + + while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) { + if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + if (tsdbLoadBlockIdx(&readh) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + for (int i = 1; i <= maxTableIdx; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + + //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); + + if (tsdbSetReadTable(&readh, pTable) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + SBlockIdx *pIdx = readh.pBlkIdx; + + if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { + pTable->lastKey = pIdx->maxKey; + + if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { + tsdbDestroyReadH(&readh); + return -1; + } + cacheLastRowTableNum -= 1; + } + + // restore NULL columns + if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) { + if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { + tsdbDestroyReadH(&readh); + return -1; + } + if (pTable->hasRestoreLastColumn) { + cacheLastColTableNum -= 1; + } + } + } + } + + tsdbDestroyReadH(&readh); + + if (cacheLastRow) { + atomic_store_8(&pRepo->hasCachedLastRow, 1); + } + if (cacheLastCol) { + atomic_store_8(&pRepo->hasCachedLastColumn, 1); + } + + return 0; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 1d0bda3cf40256b18135bdcac9c21f15021b4aa8..bee5600af730cda0d51c10dc969b070f7cd0252e 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -274,7 +274,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { tsem_wait(&(pRepo->readyToCommit)); - ASSERT(pRepo->imem == NULL); + //ASSERT(pRepo->imem == NULL); if (pRepo->mem == NULL) { tsem_post(&(pRepo->readyToCommit)); return 0; @@ -964,6 +964,49 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { } } +static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { + tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row)); + + STSchema* pSchema = tsdbGetTableLatestSchema(pTable); + if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { + return; + } + + pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + if (pSchema == NULL) { + return; + } + + SDataCol *pLatestCols = pTable->lastCols; + + for (int16_t j = 0; j < schemaNCols(pSchema); j++) { + STColumn *pTCol = schemaColAt(pSchema, j); + // ignore not exist colId + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pTCol->colId); + if (idx == -1) { + continue; + } + + void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + if (isNull(value, pTCol->type)) { + continue; + } + + SDataCol *pDataCol = &(pLatestCols[idx]); + if (pDataCol->pData == NULL) { + pDataCol->pData = malloc(pSchema->columns[j].bytes); + pDataCol->bytes = pSchema->columns[j].bytes; + } else if (pDataCol->bytes < pSchema->columns[j].bytes) { + pDataCol->pData = realloc(pDataCol->pData, pSchema->columns[j].bytes); + pDataCol->bytes = pSchema->columns[j].bytes; + } + + memcpy(pDataCol->pData, value, pDataCol->bytes); + //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); + pDataCol->ts = dataRowKey(row); + } +} + static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) { STsdbCfg *pCfg = &pRepo->config; @@ -977,7 +1020,7 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow } if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) { - if (pCfg->cacheLastRow || pTable->lastRow != NULL) { + if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) { SDataRow nrow = pTable->lastRow; if (taosTSizeof(nrow) < dataRowLen(row)) { SDataRow orow = nrow; @@ -1002,7 +1045,10 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow } else { pTable->lastKey = dataRowKey(row); } - } + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + updateTableLatestColumn(pRepo, pTable, row); + } + } return 0; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index e6cbc4da9e6a3d2000e395e8ae8b9a52a01d1f6c..324a7c79c5b7dbfa69bbdf240301c3c710f90b59 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -589,6 +589,131 @@ void tsdbUnRefTable(STable *pTable) { } } +void tsdbFreeLastColumns(STable* pTable) { + if (pTable->lastCols == NULL) { + return; + } + + for (int i = 0; i < pTable->maxColNum; ++i) { + if (pTable->lastCols[i].bytes == 0) { + continue; + } + tfree(pTable->lastCols[i].pData); + pTable->lastCols[i].bytes = 0; + pTable->lastCols[i].pData = NULL; + } + tfree(pTable->lastCols); + pTable->lastCols = NULL; + pTable->maxColNum = 0; + pTable->lastColSVersion = -1; + pTable->restoreColumnNum = 0; +} + +int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { + if (pTable->lastCols == NULL) { + return -1; + } + for (int16_t i = 0; i < pTable->maxColNum; ++i) { + if (pTable->lastCols[i].colId == colId) { + return i; + } + } + + return -1; +} + +int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { + ASSERT(pTable->lastCols == NULL); + + int16_t numOfColumn = pSchema->numOfCols; + + pTable->lastCols = (SDataCol*)malloc(numOfColumn * sizeof(SDataCol)); + if (pTable->lastCols == NULL) { + return -1; + } + + for (int16_t i = 0; i < numOfColumn; ++i) { + STColumn *pCol = schemaColAt(pSchema, i); + SDataCol* pDataCol = &(pTable->lastCols[i]); + pDataCol->bytes = 0; + pDataCol->pData = NULL; + pDataCol->colId = pCol->colId; + } + + pTable->lastColSVersion = schemaVersion(pSchema); + pTable->maxColNum = numOfColumn; + pTable->restoreColumnNum = 0; + return 0; +} + +STSchema* tsdbGetTableLatestSchema(STable *pTable) { + return tsdbGetTableSchemaByVersion(pTable, -1); +} + +int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) { + if (pTable->lastColSVersion == schemaVersion(pNewSchema)) { + return 0; + } + + tsdbInfo("tsdbUpdateLastColSchema:%s,%d->%d", pTable->name->data, pTable->lastColSVersion, schemaVersion(pNewSchema)); + + int16_t numOfCols = pNewSchema->numOfCols; + SDataCol *lastCols = (SDataCol*)malloc(numOfCols * sizeof(SDataCol)); + if (lastCols == NULL) { + return -1; + } + + TSDB_WLOCK_TABLE(pTable); + + for (int16_t i = 0; i < numOfCols; ++i) { + STColumn *pCol = schemaColAt(pNewSchema, i); + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); + + SDataCol* pDataCol = &(lastCols[i]); + if (idx != -1) { + // move col data to new last column array + SDataCol* pOldDataCol = &(pTable->lastCols[idx]); + memcpy(pDataCol, pOldDataCol, sizeof(SDataCol)); + } else { + // init new colid data + pDataCol->colId = pCol->colId; + pDataCol->bytes = 0; + pDataCol->pData = NULL; + } + } + + SDataCol *oldLastCols = pTable->lastCols; + int16_t oldLastColNum = pTable->maxColNum; + + pTable->lastColSVersion = schemaVersion(pNewSchema); + pTable->lastCols = lastCols; + pTable->maxColNum = numOfCols; + + if (oldLastCols == NULL) { + TSDB_WUNLOCK_TABLE(pTable); + return 0; + } + + // free old schema last column datas + for (int16_t i = 0; i < oldLastColNum; ++i) { + SDataCol* pDataCol = &(oldLastCols[i]); + if (pDataCol->bytes == 0) { + continue; + } + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pDataCol->colId); + if (idx != -1) { + continue; + } + + // free not exist column data + tfree(pDataCol->pData); + } + TSDB_WUNLOCK_TABLE(pTable); + tfree(oldLastCols); + + return 0; +} + void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, bool insertAct) { ASSERT(TABLE_TYPE(pTable) != TSDB_STREAM_TABLE && TABLE_TYPE(pTable) != TSDB_SUPER_TABLE); STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -672,6 +797,10 @@ static STable *tsdbNewTable() { pTable->lastKey = TSKEY_INITIAL_VAL; + pTable->lastCols = NULL; + pTable->restoreColumnNum = 0; + pTable->maxColNum = 0; + pTable->lastColSVersion = -1; return pTable; } @@ -785,8 +914,10 @@ static void tsdbFreeTable(STable *pTable) { kvRowFree(pTable->tagVal); tSkipListDestroy(pTable->pIndex); - taosTZfree(pTable->lastRow); + taosTZfree(pTable->lastRow); tfree(pTable->sql); + + tsdbFreeLastColumns(pTable); free(pTable); } }