提交 f3f2026e 编写于 作者: A Adam Ji

Merge branch 'main' into docs/adamji/TD-24884

...@@ -121,6 +121,12 @@ IF ("${CPUTYPE}" STREQUAL "") ...@@ -121,6 +121,12 @@ IF ("${CPUTYPE}" STREQUAL "")
SET(TD_LOONGARCH_64 TRUE) SET(TD_LOONGARCH_64 TRUE)
ADD_DEFINITIONS("-D_TD_LOONGARCH_") ADD_DEFINITIONS("-D_TD_LOONGARCH_")
ADD_DEFINITIONS("-D_TD_LOONGARCH_64") ADD_DEFINITIONS("-D_TD_LOONGARCH_64")
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "mips64")
SET(PLATFORM_ARCH_STR "mips")
MESSAGE(STATUS "input cpuType: mips64")
SET(TD_MIPS_64 TRUE)
ADD_DEFINITIONS("-D_TD_MIPS_")
ADD_DEFINITIONS("-D_TD_MIPS_64")
ENDIF () ENDIF ()
ELSE () ELSE ()
# if generate ARM version: # if generate ARM version:
...@@ -176,6 +182,8 @@ set(TD_DEPS_DIR "x86") ...@@ -176,6 +182,8 @@ set(TD_DEPS_DIR "x86")
if (TD_LINUX) if (TD_LINUX)
IF (TD_ARM_64 OR TD_ARM_32) IF (TD_ARM_64 OR TD_ARM_32)
set(TD_DEPS_DIR "arm") set(TD_DEPS_DIR "arm")
ELSEIF (TD_MIPS_64)
set(TD_DEPS_DIR "mips")
ELSE() ELSE()
set(TD_DEPS_DIR "x86") set(TD_DEPS_DIR "x86")
ENDIF() ENDIF()
......
此差异已折叠。
---
sidebar_label: DBeaver
title: DBeaver
description: You can use DBeaver to access your data stored in TDengine and TDengine Cloud.
---
[DBeaver](https://dbeaver.io/) is a popular cross-platform database management tool that facilitates data management for developers, database administrators, data analysts, and other users. Starting from version 23.1.1, DBeaver natively supports TDengine and can be used to manage TDengine Cloud as well as TDengine clusters deployed on-premises.
## Prerequisites
To use DBeaver to manage TDengine, you need to prepare the following:
- Install DBeaver. DBeaver supports mainstream operating systems including Windows, macOS, and Linux. Please make sure you download and install the correct version (23.1.1+) and platform package. Please refer to the [official DBeaver documentation](https://github.com/dbeaver/dbeaver/wiki/Installation) for detailed installation steps.
- If you use an on-premises TDengine cluster, please make sure that TDengine and taosAdapter are deployed and running properly. For detailed information, please refer to the taosAdapter User Manual.
- If you use TDengine Cloud, please [register](https://cloud.tdengine.com/) for an account.
## Usage
### Use DBeaver to access on-premises TDengine cluster
1. Start the DBeaver application, click the button or menu item to choose **New Database Connection**, and then select **TDengine** in the **Timeseries** category.
![Connect TDengine with DBeaver](./dbeaver/dbeaver-connect-tdengine-en.webp)
2. Configure the TDengine connection by filling in the host address, port number, username, and password. If TDengine is deployed on the local machine, you are only required to fill in the username and password. The default username is root and the default password is taosdata. Click **Test Connection** to check whether the connection is workable. If you do not have the TDengine Java connector installed on the local machine, DBeaver will prompt you to download and install it.
![Configure the TDengine connection](./dbeaver/dbeaver-config-tdengine-en.webp))
3. If the connection is successful, it will be displayed as shown in the following figure. If the connection fails, please check whether the TDengine service and taosAdapter are running correctly and whether the host address, port number, username, and password are correct.
![Connection successful](./dbeaver/dbeaver-connect-tdengine-test-en.webp)
4. Use DBeaver to select databases and tables and browse your data stored in TDengine.
![Browse TDengine data with DBeaver](./dbeaver/dbeaver-browse-data-en.webp)
5. You can also manipulate TDengine data by executing SQL commands.
![Use SQL commands to manipulate TDengine data in DBeaver](./dbeaver/dbeaver-sql-execution-en.webp)
### Use DBeaver to access TDengine Cloud
1. Log in to the TDengine Cloud service, select **Programming** > **Java** in the management console, and then copy the string value of `TDENGINE_JDBC_URL` displayed in the **Config** section.
![Copy JDBC URL from TDengine Cloud](./dbeaver/tdengine-cloud-jdbc-dsn-en.webp)
2. Start the DBeaver application, click the button or menu item to choose **New Database Connection**, and then select **TDengine Cloud** in the **Timeseries** category.
![Connect TDengine Cloud with DBeaver](./dbeaver/dbeaver-connect-tdengine-cloud-en.webp)
3. Configure the TDengine Cloud connection by filling in the JDBC URL value. Click **Test Connection**. If you do not have the TDengine Java connector installed on the local machine, DBeaver will prompt you to download and install it. If the connection is successful, it will be displayed as shown in the following figure. If the connection fails, please check whether the TDengine Cloud service is running properly and whether the JDBC URL is correct.
![Configure the TDengine Cloud connection](./dbeaver/dbeaver-connect-tdengine-cloud-test-en.webp)
4. Use DBeaver to select databases and tables and browse your data stored in TDengine Cloud.
![Browse TDengine Cloud data with DBeaver](./dbeaver/dbeaver-browse-data-cloud-en.webp)
5. You can also manipulate TDengine Cloud data by executing SQL commands.
![Use SQL commands to manipulate TDengine Cloud data in DBeaver](./dbeaver/dbeaver-sql-execution-cloud-en.webp)
---
sidebar_label: DBeaver
title: DBeaver
description: 使用 DBeaver 存取 TDengine 数据的详细指南
---
DBeaver 是一款流行的跨平台数据库管理工具,方便开发者、数据库管理员、数据分析师等用户管理数据。DBeaver 从 23.1.1 版本开始内嵌支持 TDengine。既支持独立部署的 TDengine 集群也支持 TDengine Cloud。
## 前置条件
### 安装 DBeaver
使用 DBeaver 管理 TDengine 需要以下几方面的准备工作。
- 安装 DBeaver。DBeaver 支持主流操作系统包括 Windows、macOS 和 Linux。请注意[下载](https://dbeaver.io/download/)正确平台和版本(23.1.1+)的安装包。详细安装步骤请参考 [DBeaver 官方文档](https://github.com/dbeaver/dbeaver/wiki/Installation)
- 如果使用独立部署的 TDengine 集群,请确认 TDengine 正常运行,并且 taosAdapter 已经安装并正常运行,具体细节请参考 [taosAdapter 的使用手册](/reference/taosadapter)
- 如果使用 TDengine Cloud,请[注册](https://cloud.taosdata.com/)相应账号。
## 使用步骤
### 使用 DBeaver 访问内部部署的 TDengine
1. 启动 DBeaver 应用,点击按钮或菜单项选择“连接到数据库”,然后在时间序列分类栏中选择 TDengine。
![DBeaver 连接 TDengine](./dbeaver/dbeaver-connect-tdengine-zh.webp)
2. 配置 TDengine 连接,填入主机地址、端口号、用户名和密码。如果 TDengine 部署在本机,可以只填用户名和密码,默认用户名为 root,默认密码为 taosdata。点击“测试连接”可以对连接是否可用进行测试。如果本机没有安装 TDengine Java
连接器,DBeaver 会提示下载安装。
![配置 TDengine 连接](./dbeaver/dbeaver-config-tdengine-zh.webp)
3. 连接成功将显示如下图所示。如果显示连接失败,请检查 TDengine 服务和 taosAdapter 是否正确运行,主机地址、端口号、用户名和密码是否正确。
![连接成功](./dbeaver/dbeaver-connect-tdengine-test-zh.webp)
4. 使用 DBeaver 选择数据库和表可以浏览 TDengine 服务的数据。
![DBeaver 浏览 TDengine 数据](./dbeaver/dbeaver-browse-data-zh.webp)
5. 也可以通过执行 SQL 命令的方式对 TDengine 数据进行操作。
![DBeaver SQL 命令](./dbeaver/dbeaver-sql-execution-zh.webp)
### 使用 DBeaver 访问 TDengine Cloud
1. 登录 TDengine Cloud 服务,在管理界面中选择“编程”和“Java”,然后复制 TDENGINE_JDBC_URL 的字符串值。
![复制 TDengine Cloud DSN](./dbeaver/tdengine-cloud-jdbc-dsn-zh.webp)
2. 启动 DBeaver 应用,点击按钮或菜单项选择“连接到数据库”,然后在时间序列分类栏中选择 TDengine Cloud。
![DBeaver 连接 TDengine Cloud](./dbeaver/dbeaver-connect-tdengine-cloud-zh.webp)
3. 配置 TDengine Cloud 连接,填入 JDBC_URL 值。点击“测试连接”,如果本机没有安装 TDengine Java
连接器,DBeaver 会提示下载安装。连接成功将显示如下图所示。如果显示连接失败,请检查 TDengine Cloud 服务是否启动,JDBC_URL 是否正确。
![配置 TDengine Cloud 连接](./dbeaver/dbeaver-connect-tdengine-cloud-test-zh.webp)
4. 使用 DBeaver 选择数据库和表可以浏览 TDengine Cloud 服务的数据。
![DBeaver 浏览 TDengine Cloud 数据](./dbeaver/dbeaver-browse-cloud-data-zh.webp)
5. 也可以通过执行 SQL 命令的方式对 TDengine Cloud 数据进行操作。
![DBeaver SQL 命令 操作 TDengine Cloud](./dbeaver/dbeaver-sql-execution-cloud-zh.webp)
...@@ -202,7 +202,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -202,7 +202,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
bool keyEscaped = false; bool keyEscaped = false;
size_t keyLenEscaped = 0; size_t keyLenEscaped = 0;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
if (unlikely(IS_COMMA(*sql))) { if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
...@@ -410,7 +410,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -410,7 +410,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
bool keyEscaped = false; bool keyEscaped = false;
size_t keyLenEscaped = 0; size_t keyLenEscaped = 0;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
if (unlikely(IS_COMMA(*sql))) { if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
...@@ -436,20 +436,21 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -436,20 +436,21 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
size_t valueLen = 0; size_t valueLen = 0;
bool valueEscaped = false; bool valueEscaped = false;
size_t valueLenEscaped = 0; size_t valueLenEscaped = 0;
bool isInQuote = false; int quoteNum = 0;
const char *escapeChar = NULL; const char *escapeChar = NULL;
while (*sql < sqlEnd) { while (*sql < sqlEnd) {
// parse value // parse value
if (unlikely(*(*sql) == QUOTE && (*(*sql - 1) != SLASH || (*sql - 1) == escapeChar))) { if (unlikely(*(*sql) == QUOTE && (*(*sql - 1) != SLASH || (*sql - 1) == escapeChar))) {
isInQuote = !isInQuote; quoteNum++;
(*sql)++; (*sql)++;
if(quoteNum > 2){
break;
}
continue; continue;
} }
if (!isInQuote) { if (quoteNum % 2 == 0 && (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql)))) {
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
break; break;
} }
}
if (IS_SLASH_LETTER_IN_FIELD_VALUE(*sql) && (*sql - 1) != escapeChar) { if (IS_SLASH_LETTER_IN_FIELD_VALUE(*sql) && (*sql - 1) != escapeChar) {
escapeChar = *sql; escapeChar = *sql;
valueEscaped = true; valueEscaped = true;
...@@ -460,8 +461,8 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin ...@@ -460,8 +461,8 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
} }
valueLen = *sql - value; valueLen = *sql - value;
if (unlikely(isInQuote)) { if (unlikely(quoteNum != 0 && quoteNum != 2)) {
smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value); smlBuildInvalidDataMsg(&info->msgBuf, "unbalanced quotes", value);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
if (unlikely(valueLen == 0)) { if (unlikely(valueLen == 0)) {
......
...@@ -224,6 +224,8 @@ TEST(testCase, smlParseCols_Error_Test) { ...@@ -224,6 +224,8 @@ TEST(testCase, smlParseCols_Error_Test) {
"st,tt=aa c 1=2 1626006833639000000,", "st,tt=aa c 1=2 1626006833639000000,",
//field value double quote,slash //field value double quote,slash
"st,tt=aa c=\"a\"a\" 1626006833639000000,", "st,tt=aa c=\"a\"a\" 1626006833639000000,",
"escape_test,tag1=\"tag1_value\",tag2=\"tag2_value\" co l0=\"col0_value\",col1=\"col1_value\" 1680918783010000000",
"escape_test,tag1=\"tag1_value\",tag2=\"tag2_value\" col0=\"co\"l\"0_value\",col1=\"col1_value\" 1680918783010000000"
}; };
SSmlHandle *info = smlBuildSmlInfo(NULL); SSmlHandle *info = smlBuildSmlInfo(NULL);
......
...@@ -215,7 +215,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { ...@@ -215,7 +215,7 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
} }
static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
rocksdb_writebatch_t *wb = NULL; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
if (read) { if (read) {
if (lock) { if (lock) {
taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->lruMutex);
...@@ -225,11 +225,12 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { ...@@ -225,11 +225,12 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
if (lock) { if (lock) {
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
} }
wb = pTsdb->rCache.writebatch;
} }
int count = rocksdb_writebatch_count(wb); int count = rocksdb_writebatch_count(wb);
if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) { if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
char *err = NULL; char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) { if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count, tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
...@@ -240,10 +241,13 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) { ...@@ -240,10 +241,13 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
rocksdb_writebatch_clear(wb); rocksdb_writebatch_clear(wb);
} }
if (lock) {
if (read) { if (read) {
if (lock) taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
} else { } else {
if (lock) taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
}
} }
} }
...@@ -287,11 +291,7 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { ...@@ -287,11 +291,7 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
*size = length; *size = length;
} }
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) {
SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) {
SCacheFlushState *state = (SCacheFlushState *)ud;
STsdb *pTsdb = state->pTsdb; STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache; SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch; rocksdb_writebatch_t *wb = rCache->writebatch;
...@@ -299,12 +299,16 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { ...@@ -299,12 +299,16 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &rocks_value, &vlen); tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
taosThreadMutexLock(&rCache->rMutex);
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen); rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
taosMemoryFree(rocks_value); taosMemoryFree(rocks_value);
if (++state->flush_count >= ROCKS_BATCH_SIZE) { if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL; char *err = NULL;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err); rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) { if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
...@@ -317,6 +321,15 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { ...@@ -317,6 +321,15 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
state->flush_count = 0; state->flush_count = 0;
} }
taosThreadMutexUnlock(&rCache->rMutex);
}
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) {
tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud);
pLastCol->dirty = 0; pLastCol->dirty = 0;
} }
...@@ -379,36 +392,10 @@ static void reallocVarData(SColVal *pColVal) { ...@@ -379,36 +392,10 @@ static void reallocVarData(SColVal *pColVal) {
} }
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) { static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
(void)key;
(void)klen;
SLastCol *pLastCol = (SLastCol *)value; SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) { if (pLastCol->dirty) {
SCacheFlushState *state = (SCacheFlushState *)ud; tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud);
STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
taosMemoryFree(rocks_value);
if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
state->flush_count = 0;
}
} }
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) /* && pLastCol->colVal.value.nData > 0*/) { if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) /* && pLastCol->colVal.value.nData > 0*/) {
...@@ -451,7 +438,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -451,7 +438,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
// 3, build keys & multi get from rocks // 3, build keys & multi get from rocks
int num_keys = TARRAY_SIZE(aColVal); int num_keys = TARRAY_SIZE(aColVal);
TSKEY keyTs = TSDBROW_TS(pRow); TSKEY keyTs = TSDBROW_TS(pRow);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
SArray *remainCols = NULL; SArray *remainCols = NULL;
SLRUCache *pCache = pTsdb->lruCache; SLRUCache *pCache = pTsdb->lruCache;
...@@ -489,14 +475,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -489,14 +475,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (!pLastCol->dirty) { if (!pLastCol->dirty) {
pLastCol->dirty = 1; pLastCol->dirty = 1;
} }
/*
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
// tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
*/
} }
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
...@@ -536,13 +514,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -536,13 +514,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
if (!pLastCol->dirty) { if (!pLastCol->dirty) {
pLastCol->dirty = 1; pLastCol->dirty = 1;
} }
/*
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
*/
} }
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
...@@ -580,6 +551,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -580,6 +551,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(keys_list_sizes); taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx;
...@@ -593,8 +565,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -593,8 +565,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
// SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; // SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
pLastCol = (SLastCol *)value; pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
...@@ -621,8 +597,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -621,8 +597,12 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
// SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; // SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
pLastCol = (SLastCol *)value; pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
...@@ -647,12 +627,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -647,12 +627,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
} }
rocksMayWrite(pTsdb, true, false, true);
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosArrayDestroy(remainCols); taosArrayDestroy(remainCols);
} }
rocksMayWrite(pTsdb, true, false, false);
taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
_exit: _exit:
...@@ -1005,9 +987,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache ...@@ -1005,9 +987,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
reallocVarData(&lastCol.colVal); reallocVarData(&lastCol.colVal);
taosArrayPush(pLastArray, &lastCol); taosArrayPush(pLastArray, &lastCol);
if (h) {
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
}
} else { } else {
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
...@@ -1032,9 +1012,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache ...@@ -1032,9 +1012,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
reallocVarData(&lastCol.colVal); reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArraySet(pLastArray, idxKey->idx, &lastCol);
if (h) {
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
}
taosArrayRemove(remainCols, i); taosArrayRemove(remainCols, i);
} else { } else {
...@@ -1137,6 +1115,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -1137,6 +1115,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksMayWrite(pTsdb, true, false, false); rocksMayWrite(pTsdb, true, false, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs); keys_list_sizes, values_list, values_list_sizes, errs);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
for (int i = 0; i < num_keys * 2; ++i) { for (int i = 0; i < num_keys * 2; ++i) {
if (errs[i]) { if (errs[i]) {
rocksdb_free(errs[i]); rocksdb_free(errs[i]);
...@@ -1147,19 +1127,42 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -1147,19 +1127,42 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
taosThreadMutexLock(&pTsdb->rCache.rMutex);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[i], klen); rocksdb_writebatch_delete(wb, keys_list[i], klen);
} }
taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
} }
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]); rocksdb_free(values_list[i + num_keys]);
taosThreadMutexLock(&pTsdb->lruMutex);
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
if (pLastCol->dirty) {
pLastCol->dirty = 0;
}
taosLRUCacheRelease(pTsdb->lruCache, h, true);
}
taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[num_keys + i], klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
if (pLastCol->dirty) {
pLastCol->dirty = 0;
}
taosLRUCacheRelease(pTsdb->lruCache, h, true);
}
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
taosThreadMutexUnlock(&pTsdb->lruMutex);
} }
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]); taosMemoryFree(keys_list[i]);
...@@ -1169,8 +1172,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE ...@@ -1169,8 +1172,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb, true, false, false); rocksMayWrite(pTsdb, true, false, true);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit: _exit:
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
...@@ -1183,7 +1185,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { ...@@ -1183,7 +1185,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
SLRUCache *pCache = NULL; SLRUCache *pCache = NULL;
size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024; size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
pCache = taosLRUCacheInit(cfgCapacity, 1, .5); pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
if (pCache == NULL) { if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
......
...@@ -1590,6 +1590,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde ...@@ -1590,6 +1590,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// persist cfg // persist cfg
syncWriteCfgFile(pSyncNode); syncWriteCfgFile(pSyncNode);
#if 0
// change isStandBy to normal (election timeout) // change isStandBy to normal (election timeout)
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, ""); syncNodeBecomeLeader(pSyncNode, "");
...@@ -1601,6 +1602,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde ...@@ -1601,6 +1602,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
} else { } else {
syncNodeBecomeFollower(pSyncNode, ""); syncNodeBecomeFollower(pSyncNode, "");
} }
#endif
} else { } else {
// persist cfg // persist cfg
syncWriteCfgFile(pSyncNode); syncWriteCfgFile(pSyncNode);
......
...@@ -1536,9 +1536,9 @@ int main(int argc, char *argv[]) { ...@@ -1536,9 +1536,9 @@ int main(int argc, char *argv[]) {
ASSERT(!ret); ASSERT(!ret);
ret = sml_ts3116_Test(); ret = sml_ts3116_Test();
ASSERT(!ret); ASSERT(!ret);
// ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
// ASSERT(!ret); ASSERT(!ret);
ret = sml_ts3303_Test(); // this test case need config sml table name using ./sml_test config_file ret = sml_ts3303_Test();
ASSERT(!ret); ASSERT(!ret);
// for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){ // for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册