未验证 提交 641f4e10 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2893 from taosdata/develop

Merge from develop into master
......@@ -11,8 +11,8 @@ TDengine采用关系型数据模型,需要建库、建表。因此对于一个
```cmd
CREATE DATABASE power KEEP 365 DAYS 10 REPLICA 3 BLOCKS 4;
```
上述语句将创建一个名为power的库,这个库的数据将保留365天(超过365天将被自动删除),每10天一个数据文件,副本数为3, 内存块数为4。详细的语法及参数请见<a href="https://www.taosdata.com/cn/documentation20/taos-sql/">TAOS SQL</a>
上述语句将创建一个名为power的库,这个库的数据将保留365天(超过365天将被自动删除),每10天一个数据文件,副本数为3, 内存块数为4。详细的语法及参数请见<a href="https://www.taosdata.com/cn/documentation20/taos-sql/">TAOS SQL</a>
**注意:**
- 任何一张表或超级表是属于一个库的,在创建表之前,必须先创建库。
......
......@@ -282,60 +282,320 @@ TDengine提供时间驱动的实时流式计算API。可以每隔一指定的时
## Java Connector
### JDBC接口
TDengine 为了方便 Java 应用使用,提供了遵循 JDBC 标准(3.0)API 规范的 `taos-jdbcdriver` 实现。目前可以通过 [Sonatype Repository][1] 搜索并下载。
由于 TDengine 是使用 c 语言开发的,使用 taos-jdbcdriver 驱动包时需要依赖系统对应的本地函数库。
* libtaos.so
在 linux 系统中成功安装 TDengine 后,依赖的本地函数库 libtaos.so 文件会被自动拷贝至 /usr/lib/libtaos.so,该目录包含在 Linux 自动扫描路径上,无需单独指定。
* taos.dll
在 windows 系统中安装完客户端之后,驱动包依赖的 taos.dll 文件会自动拷贝到系统默认搜索路径 C:/Windows/System32 下,同样无需要单独指定。
> 注意:在 windows 环境开发时需要安装 TDengine 对应的 [windows 客户端][14],Linux 服务器安装完 TDengine 之后默认已安装 client,也可以单独安装 [Linux 客户端][15] 连接远程 TDengine Server。
TDengine 的 JDBC 驱动实现尽可能的与关系型数据库驱动保持一致,但时序空间数据库与关系对象型数据库服务的对象和技术特征的差异导致 taos-jdbcdriver 并未完全实现 JDBC 标准规范。在使用时需要注意以下几点:
* TDengine 不提供针对单条数据记录的删除和修改的操作,驱动中也没有支持相关方法。
* 由于不支持删除和修改,所以也不支持事务操作。
* 目前不支持表间的 union 操作。
* 目前不支持嵌套查询(nested query),对每个 Connection 的实例,至多只能有一个打开的 ResultSet 实例;如果在 ResultSet还没关闭的情况下执行了新的查询,TSDBJDBCDriver 则会自动关闭上一个 ResultSet。
## TAOS-JDBCDriver 版本以及支持的 TDengine 版本和 JDK 版本
| taos-jdbcdriver 版本 | TDengine 版本 | JDK 版本 |
| --- | --- | --- |
| 2.0.0 | 2.0.0.x 及以上 | 1.8.x |
| 1.0.3 | 1.6.1.x 及以上 | 1.8.x |
| 1.0.2 | 1.6.1.x 及以上 | 1.8.x |
| 1.0.1 | 1.6.1.x 及以上 | 1.8.x |
## TDengine DataType 和 Java DataType
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对应类型转换如下:
| TDengine DataType | Java DataType |
| --- | --- |
| TIMESTAMP | java.sql.Timestamp |
| INT | java.lang.Integer |
| BIGINT | java.lang.Long |
| FLOAT | java.lang.Float |
| DOUBLE | java.lang.Double |
| SMALLINT, TINYINT |java.lang.Short |
| BOOL | java.lang.Boolean |
| BINARY, NCHAR | java.lang.String |
## 如何获取 TAOS-JDBCDriver
### maven 仓库
目前 taos-jdbcdriver 已经发布到 [Sonatype Repository][1] 仓库,且各大仓库都已同步。
* [sonatype][8]
* [mvnrepository][9]
* [maven.aliyun][10]
maven 项目中使用如下 pom.xml 配置即可:
```xml
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.0</version>
<type>jar</type>
<scope>system</scope>
<systemPath>{localdir}/connector/taos-jdbcdriver-2.0.0-dist.jar</systemPath>
</dependency>
</dependencies>
```
如果用户使用Java开发企业级应用,可选用TDengine提供的JDBC Driver来调用服务。TDengine提供的JDBC Driver是标准JDBC规范的子集,遵循JDBC 标准(3.0)API规范,支持现有的各种Java开发框架。目前TDengine的JDBC driver并未发布到在线依赖仓库比如maven的中心仓库。因此用户开发时,需要手动把驱动包`taos-jdbcdriver-x.x.x-dist.jar`安装到开发环境的依赖仓库中。
### 源码编译打包
TDengine 的驱动程序包的在不同操作系统上依赖不同的本地函数库(均由C语言编写)。Linux系统上,依赖一个名为`libtaos.so` 的本地库,.so即"Shared Object"缩写。成功安装TDengine后,`libtaos.so` 文件会被自动拷贝至`/usr/local/lib/taos`目录下,该目录也包含在Linux上自动扫描路径上。Windows系统上,JDBC驱动程序依赖于一个名为`taos.dll` 的本地库,.dll是动态链接库"Dynamic Link Library"的缩写。Windows上成功安装客户端后,JDBC驱动程序包默认位于`C:/TDengine/driver/JDBC/`目录下;其依赖的动态链接库`taos.dll`文件位于`C:/TDengine/driver/C`目录下,`taos.dll` 会被自动拷贝至系统默认搜索路径`C:/Windows/System32`
下载 [TDengine][3] 源码之后,进入 taos-jdbcdriver 源码目录 `src/connector/jdbc` 执行 `mvn clean package` 即可生成相应 jar 包
TDengine的JDBC Driver遵循标准JDBC规范,开发人员可以参考Oracle官方的JDBC相关文档来找到具体的接口和方法的定义与用法。TDengine的JDBC驱动在连接配置和支持的方法上与传统数据库驱动稍有不同。
TDengine的JDBC URL规范格式为:
## 使用说明
`jdbc:TSDB://{host_ip}:{port}/{database_name}?[user={user}|&password={password}|&charset={charset}|&cfgdir={config_dir}|&locale={locale}|&timezone={timezone}]`
### 获取连接
其中,`{}`中的内容必须,`[]`中为可选。配置参数说明如下:
如下所示配置即可获取 TDengine Connection:
```java
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/log?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);
```
> 端口 6030 为默认连接端口,JDBC URL 中的 log 为系统本身的监控数据库。
- user:登陆TDengine所用用户名;默认值root
- password:用户登陆密码;默认值taosdata
- charset:客户端使用的字符集;默认值为系统字符集
- cfgdir:客户端配置文件目录路径;Linux OS上默认值`/etc/taos` ,Windows OS上默认值 `C:/TDengine/cfg`
- locale:客户端语言环境;默认值系统当前locale
- timezone:客户端使用的时区;默认值为系统当前时区
TDengine 的 JDBC URL 规范格式为:
`jdbc:TSDB://{host_ip}:{port}/[database_name]?[user={user}|&password={password}|&charset={charset}|&cfgdir={config_dir}|&locale={locale}|&timezone={timezone}]`
以上所有参数均可在调用java.sql.DriverManager类创建连接时指定,示例如下:
其中,`{}` 中的内容必须,`[]` 中为可选。配置参数说明如下:
```java
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
import com.taosdata.jdbc.TSDBDriver;
* user:登录 TDengine 用户名,默认值 root。
* password:用户登录密码,默认值 taosdata。
* charset:客户端使用的字符集,默认值为系统字符集。
* cfgdir:客户端配置文件目录路径,Linux OS 上默认值 /etc/taos ,Windows OS 上默认值 C:/TDengine/cfg。
* locale:客户端语言环境,默认值系统当前 locale。
* timezone:客户端使用的时区,默认值为系统当前时区。
以上参数可以在 3 处配置,`优先级由高到低`分别如下:
1. JDBC URL 参数
如上所述,可以在 JDBC URL 的参数中指定。
2. java.sql.DriverManager.getConnection(String jdbcUrl, Properties connProps)
```java
public Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:0/db?user=root&password=taosdata";
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:0/log?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR, "/etc/taos");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIMEZONE, "UTC-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
```
这些配置参数中除了cfgdir外,均可在客户端配置文件taos.cfg中进行配置。调用java.sql.DriverManager时声明的配置参数优先级最高,JDBC URL的优先级次之,配置文件的优先级最低。例如charset同时在配置文件taos.cfg中配置,也在JDBC URL中配置,则使用JDBC URL中的配置值。
3. 客户端配置文件 taos.cfg
linux 系统默认配置文件为 /var/lib/taos/taos.cfg,windows 系统默认配置文件路径为 C:\TDengine\cfg\taos.cfg。
```properties
# client default username
# defaultUser root
# client default password
# defaultPass taosdata
# default system charset
# charset UTF-8
# system locale
# locale en_US.UTF-8
```
> 更多详细配置请参考[客户端配置][13]
### 创建数据库和表
```java
Statement stmt = conn.createStatement();
// create database
stmt.executeUpdate("create database if not exists db");
// use database
stmt.executeUpdate("use db");
此外,尽管TDengine的JDBC驱动实现尽可能的与关系型数据库驱动保持一致,但时序空间数据库与关系对象型数据库服务的对象和技术特征的差异导致TDengine的Java API并不能与标准完全相同。对于有大量关系型数据库开发经验而初次接触TDengine的开发者来说,有以下一些值的注意的地方:
// create table
stmt.executeUpdate("create table if not exists tb (ts timestamp, temperature int, humidity float)");
```
> 注意:如果不使用 `use db` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 db.tb。
### 插入数据
```java
// insert data
int affectedRows = stmt.executeUpdate("insert into tb values(now, 23, 10.3) (now + 1s, 20, 9.3)");
System.out.println("insert " + affectedRows + " rows.");
```
> now 为系统内部函数,默认为服务器当前时间。
> `now + 1s` 代表服务器当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒), s(秒), m(分), h(小时), d(天),w(周), n(月), y(年)。
### 查询数据
```java
// query data
ResultSet resultSet = stmt.executeQuery("select * from tb");
Timestamp ts = null;
int temperature = 0;
float humidity = 0;
while(resultSet.next()){
ts = resultSet.getTimestamp(1);
temperature = resultSet.getInt(2);
humidity = resultSet.getFloat("humidity");
System.out.printf("%s, %d, %s\n", ts, temperature, humidity);
}
```
> 查询和操作关系型数据库一致,使用下标获取返回字段内容时从 1 开始,建议使用字段名称获取。
* TDengine不提供针对单条数据记录的删除和修改的操作,驱动中也没有支持相关方法
* 目前TDengine不支持表间的join或union操作,因此也缺乏对该部分API的支持
* TDengine支持批量写入,但是支持停留在SQL语句级别,而不是API级别,也就是说用户需要通过写特殊的SQL语句来实现批量
* 目前TDengine不支持嵌套查询(nested query),对每个Connection的实例,至多只能有一个打开的ResultSet实例;如果在ResultSet还没关闭的情况下执行了新的查询,TSDBJDBCDriver则会自动关闭上一个ResultSet
对于TDengine操作的报错信息,用户可使用JDBCDriver包里提供的枚举类TSDBError.java来获取error message和error code的列表。对于更多的具体操作的相关代码,请参考TDengine提供的使用示范项目`JDBCDemo`
### 关闭资源
```java
resultSet.close();
stmt.close();
conn.close();
```
> `注意务必要将 connection 进行关闭`,否则会出现连接泄露。
## 与连接池使用
**HikariCP**
* 引入相应 HikariCP maven 依赖:
```xml
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.1</version>
</dependency>
```
* 使用示例如下:
```java
public static void main(String[] args) throws SQLException {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:TAOS://127.0.0.1:6030/log");
config.setUsername("root");
config.setPassword("taosdata");
config.setMinimumIdle(3); //minimum number of idle connection
config.setMaximumPoolSize(10); //maximum number of connection in the pool
config.setConnectionTimeout(10000); //maximum wait milliseconds for get connection from pool
config.setIdleTimeout(60000); // max idle time for recycle idle connection
config.setConnectionTestQuery("describe log.dn"); //validation query
config.setValidationTimeout(3000); //validation query timeout
HikariDataSource ds = new HikariDataSource(config); //create datasource
Connection connection = ds.getConnection(); // get connection
Statement statement = connection.createStatement(); // get statement
//query or insert
// ...
connection.close(); // put back to conneciton pool
}
```
> 通过 HikariDataSource.getConnection() 获取连接后,使用完成后需要调用 close() 方法,实际上它并不会关闭连接,只是放回连接池中。
> 更多 HikariCP 使用问题请查看[官方说明][5]
**Druid**
* 引入相应 Druid maven 依赖:
```xml
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.20</version>
</dependency>
```
* 使用示例如下:
```java
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("driverClassName","com.taosdata.jdbc.TSDBDriver");
properties.put("url","jdbc:TAOS://127.0.0.1:6030/log");
properties.put("username","root");
properties.put("password","taosdata");
properties.put("maxActive","10"); //maximum number of connection in the pool
properties.put("initialSize","3");//initial number of connection
properties.put("maxWait","10000");//maximum wait milliseconds for get connection from pool
properties.put("minIdle","3");//minimum number of connection in the pool
properties.put("timeBetweenEvictionRunsMillis","3000");// the interval milliseconds to test connection
properties.put("minEvictableIdleTimeMillis","60000");//the minimum milliseconds to keep idle
properties.put("maxEvictableIdleTimeMillis","90000");//the maximum milliseconds to keep idle
properties.put("validationQuery","describe log.dn"); //validation query
properties.put("testWhileIdle","true"); // test connection while idle
properties.put("testOnBorrow","false"); // don't need while testWhileIdle is true
properties.put("testOnReturn","false"); // don't need while testWhileIdle is true
//create druid datasource
DataSource ds = DruidDataSourceFactory.createDataSource(properties);
Connection connection = ds.getConnection(); // get connection
Statement statement = connection.createStatement(); // get statement
//query or insert
// ...
connection.close(); // put back to conneciton pool
}
```
> 更多 druid 使用问题请查看[官方说明][6]
**注意事项**
* TDengine `v1.6.4.1` 版本开始提供了一个专门用于心跳检测的函数 `select server_status()`,所以在使用连接池时推荐使用 `select server_status()` 进行 Validation Query。
如下所示,`select server_status()` 执行成功会返回 `1`
```shell
taos> select server_status();
server_status()|
================
1 |
Query OK, 1 row(s) in set (0.000141s)
```
## 与框架使用
* Spring JdbcTemplate 中使用 taos-jdbcdriver,可参考 [SpringJdbcTemplate][11]
* Springboot + Mybatis 中使用,可参考 [springbootdemo][12]
## 常见问题
* java.lang.UnsatisfiedLinkError: no taos in java.library.path
**原因**:程序没有找到依赖的本地函数库 taos。
**解决方法**:windows 下可以将 C:\TDengine\driver\taos.dll 拷贝到 C:\Windows\System32\ 目录下,linux 下将建立如下软链 ` ln -s /usr/local/taos/driver/libtaos.so.x.x.x.x /usr/lib/libtaos.so` 即可。
* java.lang.UnsatisfiedLinkError: taos.dll Can't load AMD 64 bit on a IA 32-bit platform
**原因**:目前 TDengine 只支持 64 位 JDK。
**解决方法**:重新安装 64 位 JDK。
* 其它问题请参考 [Issues][7]
## Python Connector
......
......@@ -89,6 +89,7 @@ remote_write:
### 查询prometheus写入数据
prometheus产生的数据格式如下:
```
{
Timestamp: 1576466279341,
Value: 37.000000,
apiserver_request_latencies_bucket {
......
......@@ -2379,10 +2379,11 @@ static void bottom_func_second_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) {
do_bottom_function_add(pOutput, pCtx->param[0].i64Key, &pInput->res[i]->v.i64Key, pInput->res[i]->timestamp,
pCtx->outputType, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->outputType;
do_bottom_function_add(pOutput, pCtx->param[0].i64Key, &pInput->res[i]->v.i64Key, pInput->res[i]->timestamp, type,
&pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
}
SET_VAL(pCtx, pInput->num, pOutput->num);
if (pOutput->num > 0) {
......
......@@ -44,6 +44,10 @@ extern int tsdbDebugFlag;
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
// NOTE: Any file format change must increase this version number by 1
// Also, implement the convert function
#define TSDB_FILE_VERSION ((uint32_t)0)
// Definitions
// ------------------ tsdbMeta.c
typedef struct STable {
......@@ -443,7 +447,7 @@ void tsdbCloseFile(SFile* pFile);
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
void tsdbFitRetention(STsdbRepo* pRepo);
int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
int tsdbUpdateFileHeader(SFile* pFile);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
......
......@@ -247,11 +247,14 @@ int tsdbOpenFile(SFile *pFile, int oflag) {
return -1;
}
tsdbTrace("open file %s, fd %d", pFile->fname, pFile->fd);
return 0;
}
void tsdbCloseFile(SFile *pFile) {
if (TSDB_IS_FILE_OPENED(pFile)) {
tsdbTrace("close file %s, fd %d", pFile->fname, pFile->fd);
close(pFile->fd);
pFile->fd = -1;
}
......@@ -276,7 +279,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) {
if (tsdbUpdateFileHeader(pFile) < 0) {
tsdbCloseFile(pFile);
return -1;
}
......@@ -313,11 +316,11 @@ void tsdbFitRetention(STsdbRepo *pRepo) {
pthread_rwlock_unlock(&(pFileH->fhlock));
}
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
int tsdbUpdateFileHeader(SFile *pFile) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
void *pBuf = (void *)buf;
taosEncodeFixedU32((void *)(&pBuf), version);
taosEncodeFixedU32((void *)(&pBuf), TSDB_FILE_VERSION);
tsdbEncodeSFileInfo((void *)(&pBuf), &(pFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
......@@ -409,6 +412,11 @@ static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
if (version != TSDB_FILE_VERSION) {
tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem",
REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION);
}
tsdbCloseFile(pFile);
return 0;
......
......@@ -132,7 +132,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
#endif
// Create and open .h
......@@ -140,7 +140,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
// Create and open .l file if should
if (tsdbShouldCreateNewLast(pHelper)) {
......@@ -149,7 +149,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
pFile->info.len = 0;
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
}
} else {
if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) return -1;
......@@ -166,44 +166,36 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
#ifdef TSDB_IDX
pFile = helperIdxF(pHelper);
if (pFile->fd > 0) {
close(pFile->fd);
pFile->fd = -1;
}
tsdbCloseFile(pFile);
#endif
pFile = helperHeadF(pHelper);
if (pFile->fd > 0) {
close(pFile->fd);
pFile->fd = -1;
}
tsdbCloseFile(pFile);
pFile = helperDataF(pHelper);
if (pFile->fd > 0) {
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else {
// TODO: shrink back to origin
}
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
}
pFile = helperLastF(pHelper);
if (pFile->fd > 0) {
if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else {
// TODO: shrink back to origin
}
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
}
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
......@@ -211,11 +203,10 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
pFile = helperNewIdxF(pHelper);
if (pFile->fd > 0) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
if (hasError) (void)remove(pFile->fname);
}
#endif
......@@ -223,22 +214,20 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
pFile = helperNewHeadF(pHelper);
if (pFile->fd > 0) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
if (hasError) (void)remove(pFile->fname);
}
pFile = helperNewLastF(pHelper);
if (pFile->fd > 0) {
if (!hasError) {
tsdbUpdateFileHeader(pFile, 0);
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
}
close(pFile->fd);
pFile->fd = -1;
tsdbCloseFile(pFile);
if (hasError) (void)remove(pFile->fname);
}
}
......
......@@ -21,6 +21,8 @@ extern "C" {
#include <stdint.h>
#define KVSTORE_FILE_VERSION ((uint32_t)0)
typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
......
......@@ -44,7 +44,7 @@ static char * tdGetKVStoreSnapshotFname(char *fdata);
static char * tdGetKVStoreNewFname(char *fdata);
static void tdFreeKVStore(SKVStore *pStore);
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version);
static int tdEncodeKVRecord(void **buf, SKVRecord *pRecord);
static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord);
static int tdRestoreKVStore(SKVStore *pStore);
......@@ -91,6 +91,7 @@ int tdDestroyKVStore(char *fname) {
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
SStoreInfo info = {0};
uint32_t version = 0;
SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH);
if (pStore == NULL) return NULL;
......@@ -111,9 +112,14 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
}
} else {
uDebug("file %s exists, try to recover the KV store", pStore->fsnap);
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) {
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info, &version) < 0) {
if (terrno != TSDB_CODE_COM_FILE_CORRUPTED) goto _err;
} else {
if (version != KVSTORE_FILE_VERSION) {
uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fsnap,
version, KVSTORE_FILE_VERSION);
}
if (ftruncate(pStore->fd, info.size) < 0) {
uError("failed to truncate %s to %" PRId64 " size since %s", pStore->fname, info.size, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -132,7 +138,11 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
(void)remove(pStore->fsnap);
}
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info, &version) < 0) goto _err;
if (version != KVSTORE_FILE_VERSION) {
uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fname, version,
KVSTORE_FILE_VERSION);
}
pStore->info.size = TD_KVSTORE_HEADER_SIZE;
pStore->info.magic = info.magic;
......@@ -320,7 +330,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
return 0;
}
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
if (lseek(fd, 0, SEEK_SET) < 0) {
......@@ -341,7 +351,9 @@ static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
return -1;
}
tdDecodeStoreInfo(buf, pInfo);
void *pBuf = (void *)buf;
pBuf = tdDecodeStoreInfo(pBuf, pInfo);
pBuf = taosDecodeFixedU32(pBuf, version);
return 0;
}
......@@ -357,6 +369,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
void *pBuf = buf;
tdEncodeStoreInfo(&pBuf, pInfo);
taosEncodeFixedU32(&pBuf, KVSTORE_FILE_VERSION);
ASSERT(POINTER_DISTANCE(pBuf, buf) + sizeof(TSCKSUM) <= TD_KVSTORE_HEADER_SIZE);
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册