提交 98469700 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/TD-2354

......@@ -40,14 +40,15 @@ def pre_test(){
sh '''
cd ${WKC}
rm -rf *
git checkout develop
git pull
git fetch
git checkout ${CHANGE_BRANCH}
git merge develop
cd ${WK}
git reset --hard
git checkout develop
git pull
cd ${WKC}
rm -rf *
mv ${WORKSPACE}/* .
cd ${WK}
export TZ=Asia/Harbin
date
......@@ -85,7 +86,7 @@ pipeline {
pre_test()
sh '''
cd ${WKC}/tests
./test-all.sh pytest
./test-all.sh pytestfq
date'''
}
}
......@@ -95,7 +96,7 @@ pipeline {
pre_test()
sh '''
cd ${WKC}/tests
./test-all.sh b1
./test-all.sh b1fq
date'''
}
}
......@@ -119,7 +120,7 @@ pipeline {
sh '''
date
cd ${WKC}/tests
./test-all.sh b2
./test-all.sh b2fq
date
'''
}
......@@ -140,7 +141,7 @@ pipeline {
sh '''
date
cd ${WKC}/tests
./test-all.sh b3
./test-all.sh b3fq
date'''
}
}
......
......@@ -5,7 +5,7 @@ go 1.14
require (
github.com/jmoiron/sqlx v1.2.0
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/taosdata/driver-go v0.0.0-20200727182616-1a3b1941c206
github.com/taosdata/driver-go v0.0.0-20201113094317-050667e5b4d0
go.uber.org/zap v1.14.1
google.golang.org/appengine v1.6.5 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
......
......@@ -77,7 +77,6 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
SHOW VARIABLES;
```
- **使用数据库**
```mysql
......@@ -85,7 +84,6 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
```
使用/切换数据库
- **删除数据库**
```mysql
DROP DATABASE [IF EXISTS] db_name;
......@@ -120,7 +118,6 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
**Tips**: 以上所有参数修改后都可以用show databases来确认是否修改成功。
- **显示系统所有数据库**
```mysql
SHOW DATABASES;
......@@ -153,7 +150,6 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
显示当前数据库下的所有数据表信息。说明:可在like中使用通配符进行名称的匹配。 通配符匹配:1)’%’ (百分号)匹配0到任意个字符;2)’_’下划线匹配一个字符。
- **在线修改显示字符宽度**
```mysql
......@@ -234,7 +230,7 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
```mysql
ALTER TABLE stb_name ADD TAG new_tag_name tag_type;
```
为STable增加一个新的标签,并指定新标签的类型。标签总数不能超过128个,总长度不超过16k个字符.
为STable增加一个新的标签,并指定新标签的类型。标签总数不能超过128个,总长度不超过16k个字符
- **删除标签**
......@@ -265,28 +261,24 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
```
向表tb_name中插入一条记录
- **插入一条记录,数据对应到指定的列**
```mysql
INSERT INTO tb_name (field1_name, ...) VALUES(field1_value, ...)
```
向表tb_name中插入一条记录,数据对应到指定的列。SQL语句中没有出现的列,数据库将自动填充为NULL。主键(时间戳)不能为NULL。
- **插入多条记录**
```mysql
INSERT INTO tb_name VALUES (field1_value1, ...) (field1_value2, ...)...;
```
向表tb_name中插入多条记录
- **按指定的列插入多条记录**
```mysql
INSERT INTO tb_name (field1_name, ...) VALUES(field1_value1, ...) (field1_value2, ...)
```
向表tb_name中按指定的列插入多条记录
- **向多个表插入多条记录**
```mysql
INSERT INTO tb1_name VALUES (field1_value1, ...)(field1_value2, ...)...
......@@ -294,7 +286,6 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
```
同时向表tb1_name和tb2_name中分别插入多条记录
- **同时向多个表按列插入多条记录**
```mysql
INSERT INTO tb1_name (tb1_field1_name, ...) VALUES (field1_value1, ...) (field1_value2, ...)
......@@ -382,7 +373,6 @@ taos> SELECT * FROM meters;
Query OK, 9 row(s) in set (0.002022s)
```
通配符支持表名前缀,以下两个SQL语句均为返回全部的列:
```mysql
SELECT * FROM d1001;
......@@ -613,7 +603,6 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
Query OK, 1 row(s) in set (0.001075s)
```
- **AVG**
```mysql
SELECT AVG(field_name) FROM tb_name [WHERE clause];
......@@ -757,7 +746,6 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
Query OK, 1 row(s) in set (0.000987s)
```
- **FIRST**
```mysql
SELECT FIRST(field_name) FROM { tb_name | stb_name } [WHERE clause];
......@@ -937,7 +925,6 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
Query OK, 2 row(s) in set (0.001162s)
```
- **SPREAD**
```mysql
SELECT SPREAD(field_name) FROM { tb_name | stb_name } [WHERE clause];
......@@ -962,7 +949,6 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
Query OK, 1 row(s) in set (0.000836s)
```
- **四则运算**
```mysql
......@@ -1010,7 +996,6 @@ SELECT function_list FROM stb_name
4. PREV填充:使用前一个非NULL值填充数据。例如:fill(prev)。
说明:
1. 使用FILL语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过1千万条具有插值的结果。
2. 在时间维度聚合中,返回的结果中时间序列严格单调递增。
......@@ -1040,8 +1025,6 @@ SELECT AVG(current),MAX(current),LEASTSQUARES(current, start_val, step_val), PER
- SQL语句最大长度65480个字符,但可通过系统配置参数maxSQLLength修改,最长可配置为1M
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制
## TAOS SQL其他约定
**group by的限制**
......@@ -1055,5 +1038,3 @@ TAOS SQL支持表之间按主键时间戳来join两张表的列,暂不支持
**is not null与不为空的表达式适用范围**
is not null支持所有类型的列。不为空的表达式为 <>"",仅对非数值类型的列适用。
\ No newline at end of file
# TDengine 2.0 错误码以及对应的十进制码
| 状态码 | 模 | 错误码(十六进制) | 错误描述 | 错误码(十进制) |
|-----------------------| :---: | :---------: | :------------------------ | ---------------- |
|TSDB_CODE_RPC_ACTION_IN_PROGRESS| 0 | 0x0001| "Action in progress"| -2147483647|
......
......@@ -159,7 +159,7 @@ ALTER DNODE <dnode_id> <config>
## 客户端配置
TDengine系统的前台交互客户端应用程序为taos,以及应用驱动,它与taosd共享同一个配置文件taos.cfg。运行taos时,使用参数-c指定配置文件目录,如taos -c /home/cfg,表示使用/home/cfg/目录下的taos.cfg配置文件中的参数,缺省目录是/etc/taos。更多taos的使用方法请见[Shell命令行程序](https://www.taosdata.com/cn/documentation/administrator/#_TDengine_Shell命令行程序)。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。
TDengine系统的前台交互客户端应用程序为taos,以及应用驱动,它与taosd共享同一个配置文件taos.cfg。运行taos时,使用参数-c指定配置文件目录,如taos -c /home/cfg,表示使用/home/cfg/目录下的taos.cfg配置文件中的参数,缺省目录是/etc/taos。更多taos的使用方法请见<a href="https://www.taosdata.com/cn/documentation/administrator/#_TDengine_Shell命令行程序">Shell命令行程序</a>。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。
**2.0.10.0 之后版本支持命令行以下参数显示当前客户端参数的配置**
......@@ -247,7 +247,6 @@ taos -C 或 taos --dump-config
Shell中binary 和 nchar字段的显示宽度上限,超过此限制的部分将被隐藏。默认值:30。可在 shell 中通过命令 set max_binary_display_width nn 动态修改此选项。
## 用户管理
系统管理员可以在CLI界面里添加、删除用户,也可以修改密码。CLI里SQL语法如下:
......@@ -428,8 +427,6 @@ TDengine的所有可执行文件默认存放在 _/usr/local/taos/bin_ 目录下
您可以通过修改系统配置文件taos.cfg来配置不同的数据目录和日志目录。
## TDengine参数限制与保留关键字
- 数据库名:不能包含“.”以及特殊字符,不能超过32个字符
......@@ -448,8 +445,6 @@ TDengine的所有可执行文件默认存放在 _/usr/local/taos/bin_ 目录下
- 库的个数:仅受节点个数限制
- 单个库上虚拟节点个数:不能超过64个
目前TDengine有将近200个内部保留关键字,这些关键字无论大小写均不可以用作库名、表名、STable名、数据列名及标签列名等。这些关键字列表如下:
| 关键字列表 | | | | |
......@@ -490,4 +485,3 @@ TDengine的所有可执行文件默认存放在 _/usr/local/taos/bin_ 目录下
| CONCAT | GLOB | METRICS | SEMI | WAVG |
| CONFIGS | GRANTS | MIN | SET | WHERE |
| CONFLICT | GROUP | | | |
\ No newline at end of file
#数据模型和整体架构
# 数据模型和整体架构
## 数据模型
### 物联网典型场景
......@@ -150,7 +150,7 @@ TDengine 分布式架构的逻辑结构图如下:
<center> 图 1 TDengine架构示意图 </center>
一个完整的 TDengine 系统是运行在一到多个物理节点上的,逻辑上,它包含数据节点(dnode)、TDengine应用驱动(taosc)以及应用(app)。系统中存在一到多个数据节点,这些数据节点组成一个集群(cluster)。应用通过taosc的API与TDengine集群进行互动。下面对每个逻辑单元进行简要介绍。
**物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机,可以是安装有OS的物理机、虚拟机或Docker容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。TDengine完全依赖FQDN来进行网络通讯,如果不了解FQDN,请看博文[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)
**物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机,可以是安装有OS的物理机、虚拟机或Docker容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。TDengine完全依赖FQDN来进行网络通讯,如果不了解FQDN,请看博文<a href="https://www.taosdata.com/blog/2020/09/11/1824.html">《一篇文章说清楚TDengine的FQDN》</a>
**数据节点(dnode):** dnode 是 TDengine 服务器侧执行代码 taosd 在物理节点上的一个运行实例,一个工作的系统必须有至少一个数据节点。dnode包含零到多个逻辑的虚拟节点(VNODE),零或者至多一个逻辑的管理节点(mnode)。dnode在系统中的唯一标识由实例的End Point (EP )决定。EP是dnode所在物理节点的FQDN (Fully Qualified Domain Name)和系统所配置的网络端口号(Port)的组合。通过配置不同的端口,一个物理节点(一台物理机、虚拟机或容器)可以运行多个实例,或有多个数据节点。
......
# Java Connector
Java连接器支持的系统有: Linux 64/Windows x64/Windows x86。
Java连接器支持的系统有:
| **CPU类型** | x64(64bit) | | | ARM64 | ARM32 |
| ------------ | ------------ | -------- | -------- | -------- | -------- |
| **OS类型** | Linux | Win64 | Win32 | Linux | Linux |
| **支持与否** | **支持** | **支持** | **支持** | **支持** | **支持** |
TDengine 为了方便 Java 应用使用,提供了遵循 JDBC 标准(3.0)API 规范的 `taos-jdbcdriver` 实现。目前可以通过 [Sonatype Repository][1] 搜索并下载。
......@@ -21,7 +25,6 @@ TDengine 的 JDBC 驱动实现尽可能的与关系型数据库驱动保持一
* 目前不支持表间的 union 操作。
* 目前不支持嵌套查询(nested query),对每个 Connection 的实例,至多只能有一个打开的 ResultSet 实例;如果在 ResultSet还没关闭的情况下执行了新的查询,TSDBJDBCDriver 则会自动关闭上一个 ResultSet。
## TAOS-JDBCDriver 版本以及支持的 TDengine 版本和 JDK 版本
| taos-jdbcdriver 版本 | TDengine 版本 | JDK 版本 |
......@@ -70,7 +73,6 @@ maven 项目中使用如下 pom.xml 配置即可:
下载 [TDengine][3] 源码之后,进入 taos-jdbcdriver 源码目录 `src/connector/jdbc` 执行 `mvn clean package` 即可生成相应 jar 包。
## 使用说明
### 获取连接
......@@ -212,7 +214,6 @@ while(resultSet.next()){
```
> 查询和操作关系型数据库一致,使用下标获取返回字段内容时从 1 开始,建议使用字段名称获取。
### 订阅
#### 创建
......@@ -227,7 +228,7 @@ TSDBSubscribe sub = ((TSDBConnection)conn).subscribe("topic", "select * from met
* sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
* restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
如上面的例子将使用 SQL 语句 `select * from meters` 创建一个名为 `topic' 的订阅,如果这个订阅已经存在,将继续之前的查询进度,而不是从头开始消费所有的数据。
如上面的例子将使用 SQL 语句 `select * from meters` 创建一个名为 `topic` 的订阅,如果这个订阅已经存在,将继续之前的查询进度,而不是从头开始消费所有的数据。
#### 消费数据
......@@ -255,7 +256,6 @@ sub.close(true);
`close` 方法关闭一个订阅。如果其参数为 `true` 表示保留订阅进度信息,后续可以创建同名订阅继续消费数据;如为 `false` 则不保留订阅进度。
### 关闭资源
```java
......
......@@ -32,7 +32,7 @@
3. 在服务器,执行 `systemctl status taosd` 检查*taosd*运行状态。如果没有运行,启动*taosd*
4. 确认客户端连接时指定了正确的服务器FQDN (Fully Qualified Domain Name(可在服务器上执行Linux命令hostname -f获得)),FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)
4. 确认客户端连接时指定了正确的服务器FQDN (Fully Qualified Domain Name(可在服务器上执行Linux命令hostname -f获得)),FQDN配置参考:<a href="https://www.taosdata.com/blog/2020/09/11/1824.html">一篇文章说清楚TDengine的FQDN</a>
5. ping服务器FQDN,如果没有反应,请检查你的网络,DNS设置,或客户端所在计算机的系统hosts文件
......@@ -51,19 +51,16 @@
* Windows 系统请使用 PowerShell 命令 Net-TestConnection -ComputerName {fqdn} -Port {port} 检测服务段端口是否访问
10. 也可以使用taos程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅(包括TCP和UDP):[TDengine 内嵌网络检测工具使用指南](https://www.taosdata.com/blog/2020/09/08/1816.html)
10. 也可以使用taos程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅(包括TCP和UDP):<a href="https://www.taosdata.com/blog/2020/09/08/1816.html">TDengine 内嵌网络检测工具使用指南</a>
## 6. 遇到错误“Unexpected generic error in RPC”或者"TDengine Error: Unable to resolve FQDN", 我怎么办?
产生这个错误,是由于客户端或数据节点无法解析FQDN(Fully Qualified Domain Name)导致。对于TAOS Shell或客户端应用,请做如下检查:
1. 请检查连接的服务器的FQDN是否正确,FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)
1. 请检查连接的服务器的FQDN是否正确,FQDN配置参考:<a href="https://www.taosdata.com/blog/2020/09/11/1824.html">一篇文章说清楚TDengine的FQDN</a>
2. 如果网络配置有DNS server, 请检查是否正常工作
3. 如果网络没有配置DNS server, 请检查客户端所在机器的hosts文件,查看该FQDN是否配置,并是否有正确的IP地址。
4. 如果网络配置OK,从客户端所在机器,你需要能Ping该连接的FQDN,否则客户端是无法连接服务器的
## 7. 虽然语法正确,为什么我还是得到 "Invalid SQL" 错误
如果你确认语法正确,2.0之前版本,请检查SQL语句长度是否超过64K。如果超过,也会返回这个错误。
......@@ -86,7 +83,7 @@ TDengine还没有一组专用的validation queries。然而建议你使用系统
## 11. 最有效的写入数据的方法是什么?windows系统下插入的nchar类数据中的汉字被解析成了乱码如何解决?
windows下插入nchar类的数据中如果有中文,请先确认系统的地区设置成了中国(在Control Panel里可以设置),这时cmd中的`taos`客户端应该已经可以正常工作了;如果是在IDE里开发Java应用,比如Eclipse, Intellij,请确认IDE里的文件编码为GBK(这是Java默认的编码类型),然后在生成Connection时,初始化客户端的配置,具体语句如下:
Windows下插入nchar类的数据中如果有中文,请先确认系统的地区设置成了中国(在Control Panel里可以设置),这时cmd中的`taos`客户端应该已经可以正常工作了;如果是在IDE里开发Java应用,比如Eclipse, Intellij,请确认IDE里的文件编码为GBK(这是Java默认的编码类型),然后在生成Connection时,初始化客户端的配置,具体语句如下:
```JAVA
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
......@@ -94,7 +91,7 @@ properties.setProperty(TSDBDriver.LOCALE_KEY, "UTF-8");
Connection = DriverManager.getConnection(url, properties);
```
## 12.TDengine GO windows驱动的如何编译?
请看为此问题撰写的<a href='blog/2020/01/06/tdengine-go-windows驱动的编译/'>技术博客
请看为此问题撰写的<a href='blog/2020/01/06/tdengine-go-windows驱动的编译/'>技术博客</a>
## 13.JDBC报错: the excuted SQL is not a DML or a DDL?
请更新至最新的JDBC驱动
......@@ -109,14 +106,10 @@ Connection = DriverManager.getConnection(url, properties);
常见原因是服务器和客户端时间没有校准,可以通过和时间服务器同步的方式(Linux 下使用 ntpdate 命令,Windows 在系统时间设置中选择自动同步)校准。
## 15. 表名显示不全
由于 taos shell 在终端中显示宽度有限,有可能比较长的表名显示不全,如果按照显示的不全的表名进行相关操作会发生 Table does not exist 错误。解决方法可以是通过修改 taos.cfg 文件中的设置项 maxBinaryDisplayWidth, 或者直接输入命令 set max_binary_display_width 100。或者在命令结尾使用 \G 参数来调整结果的显示方式。
## 16. 如何进行数据迁移?
TDengine是根据hostname唯一标志一台机器的,在数据文件从机器A移动机器B时,注意如下两件事:
......@@ -125,11 +118,9 @@ TDengine是根据hostname唯一标志一台机器的,在数据文件从机器A
- 2.0.7.0 及以后的版本,到/var/lib/taos/dnode下,修复dnodeEps.json的dnodeId对应的FQDN,重启。确保机器内所有机器的此文件是完全相同的。
- 1.x 和 2.x 版本的存储结构不兼容,需要使用迁移工具或者自己开发应用导出导入数据。
## 17. 怎么报告问题?
如果 FAQ 中的信息不能够帮到您,需要 TDengine 技术团队的技术支持与协助,请将以下两个目录中内容打包:
如果 FAQ 中的信息不能够帮到您,需要 TDengine 技术团队的技术支持与协助,请将以下两个目录中内容打包
1. /var/log/taos
2. /etc/taos
......
......@@ -28,10 +28,10 @@ INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6,
- 写入的数据的时间戳必须大于当前时间减去配置参数keep的时间。如果keep配置为3650天,那么无法写入比3650天还老的数据。写入数据的时间戳也不能大于当前时间加配置参数days。如果days配置为2,那么无法写入比当前时间还晚2天的数据。
## Prometheus直接写入
[Prometheus](https://www.prometheus.io/)作为Cloud Native Computing Fundation毕业的项目,在性能监控以及K8S性能监控领域有着非常广泛的应用。TDengine提供一个小工具[Bailongma](https://github.com/taosdata/Bailongma),只需在Prometheus做简单配置,无需任何代码,就可将Prometheus采集的数据直接写入TDengine,并按规则在TDengine自动创建库和相关表项。博文[用Docker容器快速搭建一个Devops监控Demo](https://www.taosdata.com/blog/2020/02/03/1189.html)即是采用bailongma将Prometheus和Telegraf的数据写入TDengine中的示例,可以参考。
<a href="https://www.prometheus.io/">Prometheus</a>作为Cloud Native Computing Fundation毕业的项目,在性能监控以及K8S性能监控领域有着非常广泛的应用。TDengine提供一个小工具<a href="https://github.com/taosdata/Bailongma">Bailongma</a>,只需在Prometheus做简单配置,无需任何代码,就可将Prometheus采集的数据直接写入TDengine,并按规则在TDengine自动创建库和相关表项。博文<a href="https://www.taosdata.com/blog/2020/02/03/1189.html">用Docker容器快速搭建一个Devops监控Demo</a>即是采用bailongma将Prometheus和Telegraf的数据写入TDengine中的示例,可以参考。
### 从源代码编译blm_prometheus
用户需要从github下载[Bailongma](https://github.com/taosdata/Bailongma)的源码,使用Golang语言编译器编译生成可执行文件。在开始编译前,需要准备好以下条件:
用户需要从github下载<a href="https://github.com/taosdata/Bailongma">Bailongma</a>的源码,使用Golang语言编译器编译生成可执行文件。在开始编译前,需要准备好以下条件:
- Linux操作系统的服务器
- 安装好Golang, 1.10版本以上
- 对应的TDengine版本。因为用到了TDengine的客户端动态链接库,因此需要安装好和服务端相同版本的TDengine程序;比如服务端版本是TDengine 2.0.0, 则在bailongma所在的linux服务器(可以与TDengine在同一台服务器,或者不同服务器)
......@@ -45,10 +45,10 @@ go build
一切正常的情况下,就会在对应的目录下生成一个blm_prometheus的可执行程序。
### 安装Prometheus
通过Prometheus的官网下载安装。[下载地址](https://prometheus.io/download/)
通过Prometheus的官网下载安装。<a href="https://prometheus.io/download/">下载地址</a>
### 配置Prometheus
参考Prometheus的[配置文档](https://prometheus.io/docs/prometheus/latest/configuration/configuration/),在Prometheus的配置文件中的<remote_write>部分,增加以下配置
参考Prometheus的<a href="https://prometheus.io/docs/prometheus/latest/configuration/configuration/">配置文档</a>在Prometheus的配置文件中的<remote_write>部分,增加以下配置
- url: bailongma API服务提供的URL, 参考下面的blm_prometheus启动示例章节
......@@ -113,10 +113,10 @@ select * from apiserver_request_latencies_bucket;
```
## Telegraf直接写入
[Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)是一流行的IT运维数据采集开源工具,TDengine提供一个小工具[Bailongma](https://github.com/taosdata/Bailongma),只需在Telegraf做简单配置,无需任何代码,就可将Telegraf采集的数据直接写入TDengine,并按规则在TDengine自动创建库和相关表项。博文[用Docker容器快速搭建一个Devops监控Demo](https://www.taosdata.com/blog/2020/02/03/1189.html)即是采用bailongma将Prometheus和Telegraf的数据写入TDengine中的示例,可以参考。
<a href="https://www.influxdata.com/time-series-platform/telegraf/"Telegraf</a>是一流行的IT运维数据采集开源工具,TDengine提供一个小工具<a href="https://github.com/taosdata/Bailongma">Bailongma</a>,只需在Telegraf做简单配置,无需任何代码,就可将Telegraf采集的数据直接写入TDengine,并按规则在TDengine自动创建库和相关表项。博文<a href="https://www.taosdata.com/blog/2020/02/03/1189.html">用Docker容器快速搭建一个Devops监控Demo</a>即是采用bailongma将Prometheus和Telegraf的数据写入TDengine中的示例,可以参考。
### 从源代码编译blm_telegraf
用户需要从github下载[Bailongma](https://github.com/taosdata/Bailongma)的源码,使用Golang语言编译器编译生成可执行文件。在开始编译前,需要准备好以下条件:
用户需要从github下载<a href="https://github.com/taosdata/Bailongma">Bailongma</a>的源码,使用Golang语言编译器编译生成可执行文件。在开始编译前,需要准备好以下条件:
- Linux操作系统的服务器
- 安装好Golang, 1.10版本以上
......@@ -148,7 +148,7 @@ go build
- hostname: 区分不同采集设备的机器名称,需确保其唯一性
- metric_batch_size: 100,允许Telegraf每批次写入记录最大数量,增大其数量可以降低Telegraf的请求发送频率。
关于如何使用Telegraf采集数据以及更多有关使用Telegraf的信息,请参考Telegraf官方的[文档](https://docs.influxdata.com/telegraf/v1.11/)
关于如何使用Telegraf采集数据以及更多有关使用Telegraf的信息,请参考Telegraf官方的<a href="https://docs.influxdata.com/telegraf/v1.11/">文档</a>
### 启动blm_telegraf程序
blm_telegraf程序有以下选项,在启动blm_telegraf程序时可以通过设定这些选项来设定blm_telegraf的配置。
......@@ -218,15 +218,12 @@ use telegraf;
select * from cpu;
```
MQTT是一流行的物联网数据传输协议,TDengine 可以很方便的接入 MQTT Broker 接受的数据并写入到 TDengine。
## EMQ Broker 直接写入
[EMQ](https://github.com/emqx/emqx)是一开源的MQTT Broker软件,无需任何代码,只需要在EMQ Dashboard里使用“规则”做简单配置,即可将MQTT的数据直接写入TDengine。EMQ X 支持通过 发送到 Web 服务 的方式保存数据到 TDengine,也在企业版上提供原生的 TDEngine 驱动实现直接保存。详细使用方法请参考 [EMQ 官方文档](https://docs.emqx.io/broker/latest/cn/rule/rule-example.html#%E4%BF%9D%E5%AD%98%E6%95%B0%E6%8D%AE%E5%88%B0-tdengine)
<a href="https://github.com/emqx/emqx">EMQ</a>是一开源的MQTT Broker软件,无需任何代码,只需要在EMQ Dashboard里使用“规则”做简单配置,即可将MQTT的数据直接写入TDengine。EMQ X 支持通过 发送到 Web 服务 的方式保存数据到 TDengine,也在企业版上提供原生的 TDEngine 驱动实现直接保存。详细使用方法请参考<a href="https://docs.emqx.io/broker/latest/cn/rule/rule-example.html#%E4%BF%9D%E5%AD%98%E6%95%B0%E6%8D%AE%E5%88%B0-tdengine">EMQ 官方文档</a>
## HiveMQ Broker 直接写入
[HiveMQ](https://www.hivemq.com/) 是一个提供免费个人版和企业版的 MQTT 代理,主要用于企业和新兴的机器到机器M2M通讯和内部传输,满足可伸缩性、易管理和安全特性。HiveMQ 提供了开源的插件开发包。可以通过 HiveMQ extension - TDengine 保存数据到 TDengine。详细使用方法请参考 [HiveMQ extension - TDengine 说明文档](https://github.com/huskar-t/hivemq-tdengine-extension/blob/b62a26ecc164a310104df57691691b237e091c89/README.md)
<a href="https://www.hivemq.com/">HiveMQ</a> 是一个提供免费个人版和企业版的 MQTT 代理,主要用于企业和新兴的机器到机器M2M通讯和内部传输,满足可伸缩性、易管理和安全特性。HiveMQ 提供了开源的插件开发包。可以通过 HiveMQ extension - TDengine 保存数据到 TDengine。详细使用方法请参考 <a href="https://github.com/huskar-t/hivemq-tdengine-extension/blob/b62a26ecc164a310104df57691691b237e091c89/README.md">HiveMQ extension - TDengine 说明文档</a>
......@@ -30,7 +30,7 @@ TDengine里存在vnode, mnode, vnode用来存储时序数据,mnode用来存储
- master: 具有最新的数据,容许客户端往里写入数据,一个虚拟节点组,至多一个master.
- slave:与master是同步的,但不容许客户端往里写入数据,根据配置,可以容许客户端对其进行查询。
- unsynced: 节点处于非同步状态,比如虚拟节点刚启动、或与其他虚拟节点的连接出现故障等。处于该状态时,该虚拟节点既不能提供写入,也不能提供查询服务
- unsynced: 节点处于非同步状态,比如虚拟节点刚启动、或与其他虚拟节点的连接出现故障等。处于该状态时,该虚拟节点既不能提供写入,也不能提供查询服务
- offline: 由于宕机或网络原因,无法访问到某虚拟节点时,其他虚拟节点将该虚拟节点标为离线。但请注意,该虚拟节点本身的状态可能是unsynced或其他,但不会是离线。
**Quorum:**
......@@ -83,10 +83,10 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
如果一个虚拟节点(vnode A)检测到与同一虚拟节点组内另外一虚拟节点(vnode B)的连接中断,vnode A将立即把vnode B的role设置为offline。无论是接收到另外一虚拟节点发来的status消息,还是检测与另外一虚拟节点的连接中断,该虚拟节点都将进入状态处理流程。状态处理流程的规则如下:
1. 如果检测到在线的节点数没有超过一半,则将自己的状态设置为unsynced.
2. 如果在线的虚拟节点数超过一半,会检查master节点是否存在,如果存在,则会决定是否将自己状态改为slave或启动数据恢复流程
2. 如果在线的虚拟节点数超过一半,会检查master节点是否存在,如果存在,则会决定是否将自己状态改为slave或启动数据恢复流程
3. 如果master不存在,则会检查自己保存的各虚拟节点的状态信息与从另一节点接收到的是否一致,如果一致,说明节点组里状态已经稳定一致,则会触发选举流程。如果不一致,说明状态还没趋于一致,即使master不存在,也不进行选主。由于要求状态信息一致才进行选举,每个虚拟节点根据同样的信息,会选出同一个虚拟节点做master,无需投票表决。
4. 自己的状态是根据规则自己决定并修改的,并不需要其他节点同意,包括成为master。一个节点无权修改其他节点的状态。
5. 如果一个虚拟节点检测到自己或其他虚拟节点的role发生改变,该节点会广播它自己保存的各个虚拟节点的状态信息(role和version).
5. 如果一个虚拟节点检测到自己或其他虚拟节点的role发生改变,该节点会广播它自己保存的各个虚拟节点的状态信息(role和version)
具体的流程图如下:
......@@ -124,7 +124,7 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
如果一虚拟节点(vnode B) 处于unsynced状态,master存在(vnode A),而且其版本号比master的低,它将立即启动数据恢复流程。在理解恢复流程时,需要澄清几个关于文件的概念和处理规则。
1. 每个文件(无论是archived data的file还是wal)都有一个index, 这需要应用来维护(vnode里,该index就是fileId*3 + 0/1/2, 对应data, head与last三个文件)。如果index为0,表示系统里最老的数据文件。对于mnode里的文件,数量是固定的,对应于acct, user, db, table等文件。
1. 每个文件(无论是archived data的file还是wal)都有一个index, 这需要应用来维护(vnode里,该index就是fileId*3 + 0/1/2, 对应data, head与last三个文件)。如果index为0,表示系统里最老的数据文件。对于mode里的文件,数量是固定的,对应于acct, user, db, table等文件。
2. 任何一个数据文件(file)有名字、大小,还有一个magic number。只有文件名、大小与magic number一致时,两个文件才判断是一样的,无需同步。Magic number可以是checksum, 也可以是简单的文件大小。怎么计算magic,换句话说,如何检测数据文件是否有效,完全由应用决定。
3. 文件名的处理有点复杂,因为每台服务器的路径可能不一致。比如node A的TDengine的数据文件存放在 /etc/taos目录下,而node B的数据存放在 /home/jhtao目录下。因此同步模块需要应用在启动一个同步实例时提供一个path,这样两台服务器的绝对路径可以不一样,但仍然可以做对比,做同步。
4. 当sync模块调用回调函数getFileInfo获得数据文件信息时,有如下的规则
......@@ -212,10 +212,10 @@ Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系
相同之处:
- 三大流程一致:Raft里有Leader election, replication, safety,完全对应TDengine的选举、数据转发、数据恢复三个流程
- 节点状态定义一致:Raft里每个节点有Leader, Follower, Candidate三个状态,TDengine里是Master, Slave, Unsynced, Offline。多了一个offlince, 但本质上是一样的,因为offline是外界看一个节点的状态,但该节点本身是处于master, slave 或unsynced的
- 三大流程一致:Raft里有Leader election, replication, safety,完全对应TDengine的选举、数据转发、数据恢复三个流程
- 节点状态定义一致:Raft里每个节点有Leader, Follower, Candidate三个状态,TDengine里是Master, Slave, Unsynced, Offline。多了一个offlince, 但本质上是一样的,因为offline是外界看一个节点的状态,但该节点本身是处于master, slave 或unsynced的
- 数据转发流程完全一样,Master(leader)需要等待回复确认。
- 数据恢复流程几乎一样,Raft没有涉及历史数据同步问题,只考虑了WAL数据同步
- 数据恢复流程几乎一样,Raft没有涉及历史数据同步问题,只考虑了WAL数据同步
不同之处:
......@@ -226,7 +226,7 @@ Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系
## Meta Data的数据复制
TDengine里存在时序数据,也存在Meta Data。Meta Data对数据的可靠性要求更高,那么TDengine设计能否满足要求呢?下面做个仔细分析
TDengine里存在时序数据,也存在Meta Data。Meta Data对数据的可靠性要求更高,那么TDengine设计能否满足要求呢?下面做个仔细分析
TDengine里Meta Data包括以下:
......
......@@ -39,10 +39,10 @@
# number of management nodes in the system
# numOfMnodes 3
# enable/disable backuping vnode directory when removing dnode
# enable/disable backuping vnode directory when removing vnode
# vnodeBak 1
# if report installation / use information
# enable/disable installation / usage report
# telemetryReporting 1
# enable/disable load balancing
......@@ -81,7 +81,7 @@
# minimum time window, milli-second
# minIntervalTime 10
# maximum delay before launching a stream compution, milli-second
# maximum delay before launching a stream computation, milli-second
# maxStreamCompDelay 20000
# maximum delay before launching a stream computation for the first time, milli-second
......@@ -156,7 +156,7 @@
# max number of connections allowed in dnode
# maxShellConns 5000
# max numerber of connections allowed in client
# max number of connections allowed in client
# maxConnections 5000
# stop writing logs when the disk size of the log folder is less than this value
......@@ -187,7 +187,7 @@
# restfulRowLimit 10240
# The following parameter is used to limit the maximum number of lines in log files.
# max number of rows per log filters
# max number of lines per log filters
# numOfLogLines 10000000
# enable/disable async log
......@@ -199,7 +199,9 @@
# The following parameters are used for debug purpose only.
# debugFlag 8 bits mask: FILE-SCREEN-UNUSED-HeartBeat-DUMP-TRACE_WARN-ERROR
# 131: output warning and error, 135: output debug, warning and error, 143 : output trace, debug, warning and error to log.
# 131: output warning and error
# 135: output debug, warning and error
# 143: output trace, debug, warning and error to log
# 199: output debug, warning and error to both screen and file
# 207: output trace, debug, warning and error to both screen and file
......@@ -231,10 +233,10 @@
# cDebugFlag 131
# debug flag for JNI
# jniDebugflag 131
# jniDebugFlag 131
# debug flag for storage
# uDebugflag 131
# uDebugFlag 131
# debug flag for http server
# httpDebugFlag 131
......@@ -243,12 +245,12 @@
# monDebugFlag 131
# debug flag for query
# qDebugflag 131
# qDebugFlag 131
# debug flag for vnode
# vDebugflag 131
# vDebugFlag 131
# debug flag for http server
# debug flag for TSDB
# tsdbDebugFlag 131
# debug flag for continue query
......
......@@ -266,8 +266,14 @@ function install_config() {
${csudo} chmod 644 ${cfg_install_dir}/*
fi
# Save standard input to 6 and open / dev / TTY on standard input
exec 6<&0 0</dev/tty
local_fqdn_check
# restore the backup standard input, and turn off 6
exec 0<&6 6<&-
${csudo} mv ${cfg_dir}/taos.cfg ${cfg_dir}/taos.cfg.org
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${cfg_dir}
#FQDN_FORMAT="(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)"
......@@ -422,7 +428,7 @@ function install_service() {
}
function install_TDengine() {
echo -e "${GREEN}Start to install TDEngine...${NC}"
echo -e "${GREEN}Start to install TDengine...${NC}"
#install log and data dir , then ln to /usr/local/taos
${csudo} mkdir -p ${log_dir} && ${csudo} chmod 777 ${log_dir}
......
......@@ -119,4 +119,4 @@ if ((${service_mod}==2)); then
kill_taosd
fi
echo -e "${GREEN}TDEngine is removed successfully!${NC}"
echo -e "${GREEN}TDengine is removed successfully!${NC}"
......@@ -2,19 +2,39 @@
#
# This file is used to set config for core when taosd crash
# Color setting
RED='\033[0;31m'
GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
set -e
# set -x
corePath=$1
csudo=""
if command -v sudo > /dev/null; then
csudo="sudo"
fi
#ulimit -c unlimited
if [[ ! -n ${corePath} ]]; then
echo -e -n "${GREEN}Please enter a file directory to save the coredump file${NC}:"
read corePath
while true; do
if [[ ! -z "$corePath" ]]; then
break
else
read -p "Please enter a file directory to save the coredump file:" corePath
fi
done
fi
ulimit -c unlimited
${csudo} sed -i '/ulimit -c unlimited/d' /etc/profile ||:
${csudo} sed -i '$a\ulimit -c unlimited' /etc/profile ||:
source /etc/profile
${csudo} mkdir -p /coredump ||:
${csudo} sysctl -w kernel.core_pattern='/coredump/core-%e-%p' ||:
${csudo} echo '/coredump/core-%e-%p' | ${csudo} tee /proc/sys/kernel/core_pattern ||:
${csudo} mkdir -p ${corePath} ||:
${csudo} sysctl -w kernel.core_pattern=${corePath}/core-%e-%p ||:
${csudo} echo "${corePath}/core-%e-%p" | ${csudo} tee /proc/sys/kernel/core_pattern ||:
......@@ -44,7 +44,7 @@ static void bnUnLock() {
static bool bnCheckFree(SDnodeObj *pDnode) {
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) {
mError("dnode:%d, status:%s not available", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status));
mError("dnode:%d, status:%s not available", pDnode->dnodeId, dnodeStatus[pDnode->status]);
return false;
}
......@@ -92,13 +92,12 @@ static void bnDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) {
}
static void bnSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) {
// SVnodeGid tmp = *pVnodeGid1;
// *pVnodeGid1 = *pVnodeGid2;
// *pVnodeGid2 = tmp;
SVnodeGid tmp = *pVnodeGid1;
*pVnodeGid1 = *pVnodeGid2;
*pVnodeGid2 = tmp;
}
int32_t bnAllocVnodes(SVgObj *pVgroup) {
static int32_t randIndex = 0;
int32_t dnode = 0;
int32_t vnodes = 0;
......@@ -120,8 +119,7 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) {
break;
} else {
mDebug("dnode:%d, is not selected, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId,
mnodeGetDnodeStatusStr(pDnode->status), pDnode->openVnodes, pDnode->diskAvailable,
pDnode->alternativeRole);
dnodeStatus[pDnode->status], pDnode->openVnodes, pDnode->diskAvailable, pDnode->alternativeRole);
}
}
}
......@@ -137,7 +135,7 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) {
while (1) {
pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
mDebug("dnode:%d, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status),
mDebug("dnode:%d, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId, dnodeStatus[pDnode->status],
pDnode->openVnodes, pDnode->diskAvailable, pDnode->alternativeRole);
mnodeDecDnodeRef(pDnode);
}
......@@ -149,36 +147,6 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) {
}
}
/*
* make the choice more random.
* replica 1: no choice
* replica 2: there are 2 combinations
* replica 3 or larger: there are 6 combinations
*/
if (pVgroup->numOfVnodes == 1) {
} else if (pVgroup->numOfVnodes == 2) {
if (randIndex++ % 2 == 0) {
bnSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1);
}
} else {
int32_t randVal = randIndex++ % 6;
if (randVal == 1) { // 1, 0, 2
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
} else if (randVal == 2) { // 1, 2, 0
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
} else if (randVal == 3) { // 2, 1, 0
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
} else if (randVal == 4) { // 2, 0, 1
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
}
if (randVal == 5) { // 0, 2, 1
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
} else {
} // 0, 1, 2
}
bnReleaseDnodes();
bnUnLock();
return TSDB_CODE_SUCCESS;
......@@ -214,44 +182,8 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
static int32_t bnRemoveVnode(SVgObj *pVgroup) {
if (pVgroup->numOfVnodes <= 1) return -1;
SVnodeGid *pRmVnode = NULL;
SVnodeGid *pSelVnode = NULL;
int32_t maxScore = 0;
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVnode = &(pVgroup->vnodeGid[i]);
SDnodeObj *pDnode = mnodeGetDnode(pVnode->dnodeId);
if (pDnode == NULL) {
mError("vgId:%d, dnode:%d not exist, remove it", pVgroup->vgId, pVnode->dnodeId);
pRmVnode = pVnode;
break;
}
if (pDnode->status == TAOS_DN_STATUS_DROPPING) {
mDebug("vgId:%d, dnode:%d in dropping state", pVgroup->vgId, pVnode->dnodeId);
pRmVnode = pVnode;
} else if (pVnode->dnodeId == pVgroup->lbDnodeId) {
mDebug("vgId:%d, dnode:%d in updating state", pVgroup->vgId, pVnode->dnodeId);
pRmVnode = pVnode;
} else {
if (pSelVnode == NULL) {
pSelVnode = pVnode;
maxScore = pDnode->score;
} else {
if (maxScore < pDnode->score) {
pSelVnode = pVnode;
maxScore = pDnode->score;
}
}
}
mnodeDecDnodeRef(pDnode);
}
if (pRmVnode != NULL) {
pSelVnode = pRmVnode;
}
SVnodeGid *pSelVnode = &pVgroup->vnodeGid[pVgroup->numOfVnodes - 1];
mDebug("vgId:%d, vnode in dnode:%d will be dropped", pVgroup->vgId, pSelVnode->dnodeId);
if (!bnCheckVgroupReady(pVgroup, pSelVnode)) {
mDebug("vgId:%d, is not ready", pVgroup->vgId);
......@@ -275,36 +207,42 @@ static bool bnCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) {
return false;
}
/**
* desc: add vnode to vgroup, find a new one if dest dnode is null
**/
static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
if (pDestDnode == NULL) {
static SDnodeObj *bnGetAvailDnode(SVgObj *pVgroup) {
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
SDnodeObj *pDnode = tsBnDnodes.list[i];
if (pDnode == pSrcDnode) continue;
if (bnCheckDnodeInVgroup(pDnode, pVgroup)) continue;
if (!bnCheckFree(pDnode)) continue;
pDestDnode = pDnode;
mDebug("vgId:%d, add vnode to dnode:%d", pVgroup->vgId, pDnode->dnodeId);
break;
}
return pDnode;
}
if (pDestDnode == NULL) {
return NULL;
}
static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
if (pDestDnode == NULL || pSrcDnode == pDestDnode) {
return TSDB_CODE_MND_DNODE_NOT_EXIST;
}
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + pVgroup->numOfVnodes;
pVnodeGid->dnodeId = pDestDnode->dnodeId;
pVnodeGid->pDnode = pDestDnode;
pVgroup->numOfVnodes++;
SVnodeGid vnodeGids[TSDB_MAX_REPLICA];
memcpy(&vnodeGids, &pVgroup->vnodeGid, sizeof(SVnodeGid) * TSDB_MAX_REPLICA);
if (pSrcDnode != NULL) {
int32_t numOfVnodes = pVgroup->numOfVnodes;
vnodeGids[numOfVnodes].dnodeId = pDestDnode->dnodeId;
vnodeGids[numOfVnodes].pDnode = pDestDnode;
numOfVnodes++;
for (int32_t v = 0; v < numOfVnodes; ++v) {
if (pSrcDnode != NULL && pSrcDnode->dnodeId == vnodeGids[v].dnodeId) {
bnSwapVnodeGid(&vnodeGids[v], &vnodeGids[numOfVnodes - 1]);
pVgroup->lbDnodeId = pSrcDnode->dnodeId;
break;
}
}
memcpy(&pVgroup->vnodeGid, &vnodeGids, sizeof(SVnodeGid) * TSDB_MAX_REPLICA);
pVgroup->numOfVnodes = numOfVnodes;
atomic_add_fetch_32(&pDestDnode->openVnodes, 1);
mnodeUpdateVgroup(pVgroup);
......@@ -315,16 +253,16 @@ static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDes
static bool bnMonitorBalance() {
if (tsBnDnodes.size < 2) return false;
mDebug("monitor dnodes for balance, avail:%d", tsBnDnodes.size);
for (int32_t src = tsBnDnodes.size - 1; src >= 0; --src) {
SDnodeObj *pDnode = tsBnDnodes.list[src];
mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBnDnodes.size - src - 1,
pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status), pDnode->score, pDnode->numOfCores,
pDnode->openVnodes);
mDebug("%d-dnode:%d, state:%s, score:%.1f, cores:%d, vnodes:%d", tsBnDnodes.size - src - 1, pDnode->dnodeId,
dnodeStatus[pDnode->status], pDnode->score, pDnode->numOfCores, pDnode->openVnodes);
}
float scoresDiff = tsBnDnodes.list[tsBnDnodes.size - 1]->score - tsBnDnodes.list[0]->score;
if (scoresDiff < 0.01) {
mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBnDnodes.size, scoresDiff);
mDebug("all dnodes:%d is already balanced, scoreDiff:%.1f", tsBnDnodes.size, scoresDiff);
return false;
}
......@@ -392,7 +330,7 @@ void bnReset() {
tsAccessSquence = 0;
}
static int32_t bnMonitorVgroups() {
static bool bnMonitorVgroups() {
void * pIter = NULL;
SVgObj *pVgroup = NULL;
bool hasUpdatingVgroup = false;
......@@ -412,7 +350,13 @@ static int32_t bnMonitorVgroups() {
} else if (vgReplica < dbReplica) {
mInfo("vgId:%d, replica:%d numOfVnodes:%d, try add one vnode", pVgroup->vgId, dbReplica, vgReplica);
hasUpdatingVgroup = true;
code = bnAddVnode(pVgroup, NULL, NULL);
SDnodeObj *pAvailDnode = bnGetAvailDnode(pVgroup);
if (pAvailDnode == NULL) {
code = TSDB_CODE_MND_DNODE_NOT_EXIST;
} else {
code = bnAddVnode(pVgroup, NULL, pAvailDnode);
}
}
mnodeDecVgroupRef(pVgroup);
......@@ -545,6 +489,7 @@ void bnCheckStatus() {
mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence,
pDnode->lastAccess, pDnode->status);
bnSetVgroupOffline(pDnode);
bnStartTimer(3000);
}
}
mnodeDecDnodeRef(pDnode);
......
......@@ -299,7 +299,7 @@ static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status));
STR_TO_VARSTR(pWrite, dnodeStatus[pDnode->status]);
cols++;
numOfRows++;
......
......@@ -31,7 +31,10 @@ static void *bnThreadFunc(void *arg) {
}
pthread_cond_wait(&tsBnThread.cond, &tsBnThread.mutex);
mDebug("balance thread wakes up to work");
bool updateSoon = bnStart();
mDebug("balance thread finished this poll, updateSoon:%d", updateSoon);
bnStartTimer(updateSoon ? 1000 : -1);
pthread_mutex_unlock(&(tsBnThread.mutex));
}
......@@ -101,8 +104,8 @@ static void bnProcessTimer(void *handle, void *tmrId) {
tsBnThread.timer = NULL;
tsAccessSquence++;
bnCheckStatus();
bnStartTimer(-1);
bnCheckStatus();
if (handle == NULL) {
if (tsAccessSquence % tsBalanceInterval == 0) {
......@@ -121,6 +124,7 @@ void bnStartTimer(int64_t mseconds) {
bool updateSoon = (mseconds != -1);
if (updateSoon) {
mTrace("balance function will be called after %" PRId64 " ms", mseconds);
taosTmrReset(bnProcessTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBnThread.timer);
} else {
taosTmrReset(bnProcessTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBnThread.timer);
......
......@@ -307,7 +307,6 @@ typedef struct STscObj {
SRpcCorEpSet *tscCorMgmtEpSet;
void* pDnodeConn;
pthread_mutex_t mutex;
T_REF_DECLARE()
} STscObj;
typedef struct SSubqueryState {
......@@ -483,7 +482,6 @@ extern int tscObjRef;
extern void * tscTmr;
extern void * tscQhandle;
extern int tscKeepConn[];
extern int tsInsertHeadSize;
extern int tscNumOfThreads;
extern int tscRefId;
......
......@@ -388,10 +388,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
return;
}
assert(pSql->res.code != TSDB_CODE_SUCCESS);
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){
return;
}
......
......@@ -2597,14 +2597,23 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) {
}
//////////////////////////////////////////////////////////////////////////////////
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo));
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
}
static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
return (SAPercentileInfo*) pCtx->aOutputBuf;
pInfo = (SAPercentileInfo*) pCtx->aOutputBuf;
} else {
return GET_ROWCELL_INTERBUF(pResInfo);
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
}
buildHistogramInfo(pInfo);
return pInfo;
}
static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
......@@ -2616,6 +2625,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems);
return true;
}
......@@ -2625,6 +2635,8 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
assert(pInfo->pHisto->elems != NULL);
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
......
......@@ -911,6 +911,13 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
}
}
if (pRes->numOfRowsGroup >= pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) {
pRes->numOfRows = 0;
pBeforeFillData->num = 0;
pLocalReducer->discard = true;
return;
}
pRes->numOfRowsGroup += pRes->numOfRows;
// impose the limitation of output rows on the final result
......
......@@ -996,59 +996,59 @@ static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pC
return false;
}
int32_t nLen = 0;
/* timestamp in tag is not allowed */
for (int32_t i = 0; i < numOfTags; ++i) {
TAOS_FIELD* p = taosArrayGet(pTagsList, i);
if (p->bytes == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
if (p->type == TSDB_DATA_TYPE_TIMESTAMP) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
return false;
}
nLen += p->bytes;
if (p->type < TSDB_DATA_TYPE_BOOL || p->type > TSDB_DATA_TYPE_NCHAR) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
}
// max tag row length must be less than TSDB_MAX_TAGS_LEN
if (nLen > TSDB_MAX_TAGS_LEN) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
if ((p->type == TSDB_DATA_TYPE_BINARY && p->bytes <= 0) ||
(p->type == TSDB_DATA_TYPE_NCHAR && p->bytes <= 0)) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
return false;
}
// field name must be unique
for (int32_t i = 0; i < numOfTags; ++i) {
TAOS_FIELD* p = taosArrayGet(pTagsList, i);
if (validateColumnName(p->name) != TSDB_CODE_SUCCESS) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
return false;
}
if (has(pFieldList, 0, p->name) == true) {
if (has(pTagsList, i + 1, p->name) == true) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
return false;
}
}
/* timestamp in tag is not allowed */
int32_t nLen = 0;
for (int32_t i = 0; i < numOfTags; ++i) {
TAOS_FIELD* p = taosArrayGet(pTagsList, i);
if (p->type == TSDB_DATA_TYPE_TIMESTAMP) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
if (p->bytes == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
return false;
}
if (p->type < TSDB_DATA_TYPE_BOOL || p->type > TSDB_DATA_TYPE_NCHAR) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
nLen += p->bytes;
}
if ((p->type == TSDB_DATA_TYPE_BINARY && p->bytes <= 0) ||
(p->type == TSDB_DATA_TYPE_NCHAR && p->bytes <= 0)) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
// max tag row length must be less than TSDB_MAX_TAGS_LEN
if (nLen > TSDB_MAX_TAGS_LEN) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
return false;
}
if (validateColumnName(p->name) != TSDB_CODE_SUCCESS) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
return false;
}
// field name must be unique
for (int32_t i = 0; i < numOfTags; ++i) {
TAOS_FIELD* p = taosArrayGet(pTagsList, i);
if (has(pTagsList, i + 1, p->name) == true) {
if (has(pFieldList, 0, p->name) == true) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
return false;
}
......
......@@ -115,8 +115,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pObj->signature = pObj;
pObj->pDnodeConn = pDnodeConn;
T_REF_INIT_VAL(pObj, 1);
tstrncpy(pObj->user, user, sizeof(pObj->user));
secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass));
......@@ -172,11 +170,9 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if (taos != NULL) {
*taos = pObj;
}
pObj->rid = taosAddRef(tscRefId, pObj);
registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
pObj->rid = taosAddRef(tscRefId, pObj);
return pSql;
}
......@@ -288,34 +284,21 @@ void taos_close(TAOS *taos) {
return;
}
// make sure that the close connection can only be executed once.
pObj->signature = NULL;
taosTmrStopA(&(pObj->pTimer));
if (pObj->hbrid > 0) {
if (RID_VALID(pObj->hbrid)) {
SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
if (pHb != NULL) {
if (pHb->rpcRid > 0) { // wait for rsp from dnode
if (RID_VALID(pHb->rpcRid)) { // wait for rsp from dnode
rpcCancelRequest(pHb->rpcRid);
pHb->rpcRid = -1;
}
tscDebug("%p HB is freed", pHb);
taos_free_result(pHb);
taosReleaseRef(tscObjRef, pHb->self);
taos_free_result(pHb);
}
}
int32_t ref = T_REF_DEC(pObj);
assert(ref >= 0);
if (ref > 0) {
tscDebug("%p %d remain sqlObjs, not free tscObj and dnodeConn:%p", pObj, ref, pObj->pDnodeConn);
return;
}
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
taosRemoveRef(tscRefId, pObj->rid);
}
......
......@@ -36,7 +36,6 @@ int tscObjRef = -1;
void * tscTmr;
void * tscQhandle;
void * tscCheckDiskUsageTmr;
int tsInsertHeadSize;
int tscRefId = -1;
int tscNumOfThreads;
......
......@@ -460,15 +460,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
assert(p->self != 0);
tscFreeSqlObj(p);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref);
if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj);
taosRemoveRef(tscRefId, pTscObj->rid);
}
taosReleaseRef(tscRefId, pTscObj->rid);
}
void tscFreeTableMetaHelper(void *pTableMeta) {
......@@ -810,6 +802,7 @@ static void extractTableMeta(SSqlCmd* pCmd) {
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
......@@ -824,7 +817,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
STableDataBlocks* dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
INSERT_HEAD_SIZE, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
taosHashCleanup(pVnodeDataBlockHashList);
......@@ -1917,9 +1910,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
}
void registerSqlObj(SSqlObj* pSql) {
int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
taosAcquireRef(tscRefId, pSql->pTscObj->rid);
pSql->self = taosAddRef(tscObjRef, pSql);
}
......
......@@ -102,7 +102,8 @@ extern int32_t tsAlternativeRole;
extern int32_t tsBalanceInterval;
extern int32_t tsOfflineThreshold;
extern int32_t tsMnodeEqualVnodeNum;
extern int32_t tsFlowCtrl;
extern int32_t tsEnableFlowCtrl;
extern int32_t tsEnableSlaveQuery;
// restful
extern int32_t tsEnableHttpModule;
......
......@@ -139,7 +139,8 @@ int32_t tsAlternativeRole = 0;
int32_t tsBalanceInterval = 300; // seconds
int32_t tsOfflineThreshold = 86400*100; // seconds 10days
int32_t tsMnodeEqualVnodeNum = 4;
int32_t tsFlowCtrl = 1;
int32_t tsEnableFlowCtrl = 1;
int32_t tsEnableSlaveQuery = 1;
// restful
int32_t tsEnableHttpModule = 1;
......@@ -222,7 +223,7 @@ int32_t uDebugFlag = 131;
int32_t debugFlag = 0;
int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131;
uint32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 131;
int32_t (*monStartSystemFp)() = NULL;
......@@ -543,7 +544,7 @@ static void doInitGlobalConfig(void) {
cfg.ptr = &tsOfflineThreshold;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 5;
cfg.minValue = 3;
cfg.maxValue = 7200000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_SECOND;
......@@ -1005,7 +1006,17 @@ static void doInitGlobalConfig(void) {
// module configs
cfg.option = "flowctrl";
cfg.ptr = &tsFlowCtrl;
cfg.ptr = &tsEnableFlowCtrl;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "slaveQuery";
cfg.ptr = &tsEnableSlaveQuery;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
......@@ -1444,6 +1455,12 @@ int32_t taosCheckGlobalCfg() {
tsNumOfCores = 1;
}
if (tsMaxTablePerVnode < tsMinTablePerVnode) {
uError("maxTablesPerVnode(%d) < minTablesPerVnode(%d), reset to minTablesPerVnode(%d)",
tsMaxTablePerVnode, tsMinTablePerVnode, tsMinTablePerVnode);
tsMaxTablePerVnode = tsMinTablePerVnode;
}
// todo refactor
tsVersion = 0;
for (int i = 0; i < 10; i++) {
......
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
Subproject commit ec77d9049a719dabfd1a7c1122a209e201861944
......@@ -349,11 +349,12 @@ CTaosInterface.prototype.useResult = function useResult(result) {
return fields;
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
let num_of_rows = this.libtaos.taos_fetch_block(result, pblock)
if (num_of_rows == 0) {
//let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
let pblock = this.libtaos.taos_fetch_row(result);
if (pblock == null) {
return {block:null, num_of_rows:0};
}
var fieldL = this.libtaos.taos_fetch_lengths(result);
let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);
......@@ -361,7 +362,6 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
var fieldlens = [];
if (ref.isNull(fieldL) == false) {
for (let i = 0; i < fields.length; i ++) {
let plen = ref.reinterpret(fieldL, 4, i*4);
let len = plen.readInt32LE(0);
......
......@@ -113,6 +113,7 @@ static void dnodeCleanupTmr() {
int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_RUN_STATUS_INITIALIZE);
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
......@@ -120,7 +121,6 @@ int32_t dnodeInitSystem() {
taosSetCoreDump();
taosInitNotes();
dnodeInitTmr();
signal(SIGPIPE, SIG_IGN);
if (dnodeCreateDir(tsLogDir) < 0) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
......
......@@ -195,7 +195,7 @@ static void addRuntimeInfo(SBufferWriter* bw) {
static void sendTelemetryReport() {
char buf[128];
uint32_t ip = taosGetIpFromFqdn(TELEMETRY_SERVER);
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
if (ip == 0xffffffff) {
dTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno));
return;
......
......@@ -129,7 +129,8 @@ static void *dnodeProcessMgmtQueue(void *wparam) {
static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
pCreate->cfg.dbCfgVersion = htonl(pCreate->cfg.dbCfgVersion);
pCreate->cfg.vgCfgVersion = htonl(pCreate->cfg.vgCfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
......
......@@ -54,6 +54,7 @@ void dnodeCleanupVRead() {
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
int32_t code = TSDB_CODE_VND_INVALID_VGROUP_ID;
char * pCont = pMsg->pCont;
while (leftLen > 0) {
......@@ -64,7 +65,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
assert(pHead->contLen > 0);
void *pVnode = vnodeAcquire(pHead->vgId);
if (pVnode != NULL) {
int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
if (code == TSDB_CODE_SUCCESS) queuedMsgNum++;
vnodeRelease(pVnode);
}
......@@ -74,7 +75,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
}
if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rpcRsp);
}
......
......@@ -188,6 +188,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
int32_t numOfMsgs;
int32_t qtype;
taosBlockSIGPIPE();
dDebug("dnode vwrite worker:%d is running", pWorker->workerId);
while (1) {
......
......@@ -518,14 +518,15 @@ typedef struct SRetrieveTableRsp {
typedef struct {
int32_t vgId;
int32_t cfgVersion;
int32_t dbCfgVersion;
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
uint8_t status;
uint8_t role;
uint8_t replica;
uint8_t reserved[5];
uint8_t reserved;
int32_t vgCfgVersion;
} SVnodeLoad;
typedef struct {
......@@ -642,7 +643,7 @@ typedef struct {
typedef struct {
uint32_t vgId;
int32_t cfgVersion;
int32_t dbCfgVersion;
int32_t maxTables;
int32_t cacheBlockSize;
int32_t totalBlocks;
......@@ -662,7 +663,8 @@ typedef struct {
int8_t quorum;
int8_t update;
int8_t cacheLastRow;
int8_t reserved[14];
int32_t vgCfgVersion;
int8_t reserved[10];
} SVnodeCfg;
typedef struct {
......
......@@ -28,7 +28,7 @@ extern "C" {
default: \
(_v) = (_finalType)GET_INT32_VAL(_data); \
break; \
};
}
#ifdef __cplusplus
}
......
......@@ -332,6 +332,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
break;
case 'N':
arguments->data_batch = atoi(arg);
if (arguments->data_batch >= INT16_MAX) {
arguments->data_batch = INT16_MAX - 1;
}
break;
case 'L':
{
......
......@@ -144,7 +144,8 @@ typedef struct SVgObj {
int8_t status;
int8_t reserved0[4];
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
int8_t reserved1[12];
int32_t vgCfgVersion;
int8_t reserved1[8];
int8_t updateEnd[4];
int32_t refCount;
int32_t numOfTables;
......@@ -182,7 +183,7 @@ typedef struct SDbObj {
int8_t reserved0[4];
char acct[TSDB_USER_LEN];
int64_t createdTime;
int32_t cfgVersion;
int32_t dbCfgVersion;
SDbCfg cfg;
int8_t status;
int8_t reserved1[11];
......
......@@ -55,12 +55,12 @@ typedef enum EDnodeOfflineReason {
TAOS_DN_OFF_OTHERS
} EDnodeOfflineReason;
extern char* dnodeStatus[];
extern char* dnodeRoles[];
int32_t mnodeInitDnodes();
void mnodeCleanupDnodes();
char* mnodeGetDnodeStatusStr(int32_t dnodeStatus);
void mgmtMonitorDnodeModule();
int32_t mnodeGetDnodesNum();
int32_t mnodeGetOnlinDnodesCpuCoreNum();
int32_t mnodeGetOnlineDnodesNum();
......
......@@ -1038,7 +1038,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
pDb->cfg = newCfg;
pDb->cfgVersion++;
pDb->dbCfgVersion++;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsDbSdb,
......
......@@ -63,7 +63,6 @@ static int32_t mnodeGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole);
static void mnodeUpdateDnodeEps();
static char* offlineReason[] = {
......@@ -557,7 +556,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
for (int32_t j = 0; j < openVnodes; ++j) {
SVnodeLoad *pVload = &pStatus->load[j];
pVload->vgId = htonl(pVload->vgId);
pVload->cfgVersion = htonl(pVload->cfgVersion);
pVload->dbCfgVersion = htonl(pVload->dbCfgVersion);
pVload->vgCfgVersion = htonl(pVload->vgCfgVersion);
SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId);
if (pVgroup == NULL) {
......@@ -833,12 +833,12 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* status = mnodeGetDnodeStatusStr(pDnode->status);
char* status = dnodeStatus[pDnode->status];
STR_TO_VARSTR(pWrite, status);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* role = mnodeGetDnodeAlternativeRoleStr(pDnode->alternativeRole);
char* role = dnodeRoles[pDnode->alternativeRole];
STR_TO_VARSTR(pWrite, role);
cols++;
......@@ -1154,21 +1154,17 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
return numOfRows;
}
char* mnodeGetDnodeStatusStr(int32_t dnodeStatus) {
switch (dnodeStatus) {
case TAOS_DN_STATUS_OFFLINE: return "offline";
case TAOS_DN_STATUS_DROPPING: return "dropping";
case TAOS_DN_STATUS_BALANCING: return "balancing";
case TAOS_DN_STATUS_READY: return "ready";
default: return "undefined";
}
}
char* dnodeStatus[] = {
"offline",
"dropping",
"balancing",
"ready",
"undefined"
};
static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) {
switch (alternativeRole) {
case TAOS_DN_ALTERNATIVE_ROLE_ANY: return "any";
case TAOS_DN_ALTERNATIVE_ROLE_MNODE: return "mnode";
case TAOS_DN_ALTERNATIVE_ROLE_VNODE: return "vnode";
default:return "any";
}
}
char* dnodeRoles[] = {
"any",
"mnode",
"vnode",
"any"
};
......@@ -377,6 +377,24 @@ static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
return code;
}
static bool mnodeAllOnline() {
void *pIter = NULL;
bool allOnline = true;
while (1) {
SMnodeObj *pMnode = NULL;
pIter = mnodeGetNextMnode(pIter, &pMnode);
if (pMnode == NULL) break;
if (pMnode->role != TAOS_SYNC_ROLE_MASTER && pMnode->role != TAOS_SYNC_ROLE_SLAVE) {
allOnline = false;
mnodeDecMnodeRef(pMnode);
}
}
mnodeCancelGetNextMnode(pIter);
return allOnline;
}
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
pMnode->mnodeId = dnodeId;
......@@ -389,6 +407,11 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
.fpRsp = mnodeCreateMnodeCb
};
if (needConfirm && !mnodeAllOnline()) {
mDebug("wait all mnode online then create new mnode");
return;
}
int32_t code = TSDB_CODE_SUCCESS;
if (needConfirm) {
code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp);
......
......@@ -1081,6 +1081,8 @@ static void *sdbWorkerFp(void *pWorker) {
int32_t qtype;
void * unUsed;
taosBlockSIGPIPE();
while (1) {
int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed);
if (numOfMsgs == 0) {
......
......@@ -256,6 +256,8 @@ SVgObj *mnodeGetVgroup(int32_t vgId) {
}
void mnodeUpdateVgroup(SVgObj *pVgroup) {
pVgroup->vgCfgVersion++;
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsVgroupSdb,
......@@ -339,10 +341,11 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
pVgroup->pointsWritten = htobe64(pVload->pointsWritten);
}
if (pVload->cfgVersion != pVgroup->pDb->cfgVersion || pVload->replica != pVgroup->numOfVnodes) {
mError("dnode:%d, vgId:%d, vnode cfgVersion:%d repica:%d not match with mnode cfgVersion:%d replica:%d",
pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion,
pVgroup->numOfVnodes);
if (pVload->dbCfgVersion != pVgroup->pDb->dbCfgVersion || pVload->replica != pVgroup->numOfVnodes ||
pVload->vgCfgVersion != pVgroup->vgCfgVersion) {
mError("dnode:%d, vgId:%d, vnode cfgVersion:%d:%d repica:%d not match with mnode cfgVersion:%d:%d replica:%d",
pDnode->dnodeId, pVload->vgId, pVload->dbCfgVersion, pVload->vgCfgVersion, pVload->replica,
pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes);
mnodeSendAlterVgroupMsg(pVgroup);
}
}
......@@ -656,7 +659,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "onlineVnodes");
strcpy(pSchema[cols].name, "onlines");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -671,13 +674,13 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
for (int32_t i = 0; i < pShow->maxReplica; ++i) {
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%dDnode", i + 1);
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%dStatus", i + 1);
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_status", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
}
......@@ -840,7 +843,8 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
SVnodeCfg *pCfg = &pVnode->cfg;
pCfg->vgId = htonl(pVgroup->vgId);
pCfg->cfgVersion = htonl(pDb->cfgVersion);
pCfg->dbCfgVersion = htonl(pDb->dbCfgVersion);
pCfg->vgCfgVersion = htonl(pVgroup->vgCfgVersion);
pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks);
pCfg->maxTables = htonl(maxTables + 1);
......
......@@ -59,6 +59,7 @@ extern "C" {
// TAOS_OS_FUNC_SOCKET
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosIgnSIGPIPE();
void taosBlockSIGPIPE();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
......
......@@ -39,6 +39,10 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
return 0;
}
void taosIgnSIGPIPE() {
signal(SIGPIPE, SIG_IGN);
}
void taosBlockSIGPIPE() {
sigset_t signal_mask;
sigemptyset(&signal_mask);
......
......@@ -46,6 +46,7 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
return 0;
}
void taosIgnSIGPIPE() {}
void taosBlockSIGPIPE() {}
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
......
......@@ -136,7 +136,7 @@ void httpSendErrorResp(HttpContext *pContext, int32_t errNo) {
else
httpCode = 400;
if (pContext->parser->httpCode != 0) {
if (pContext->parser && pContext->parser->httpCode != 0) {
httpCode = pContext->parser->httpCode;
}
......
......@@ -33,13 +33,6 @@ struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SGroupResInfo {
int32_t groupId;
int32_t numOfDataPages;
int32_t pageId;
int32_t rowId;
} SGroupResInfo;
typedef struct SResultRowPool {
int32_t elemSize;
int32_t blockSize;
......@@ -72,6 +65,12 @@ typedef struct SResultRow {
union {STimeWindow win; char* key;}; // start key of current time window
} SResultRow;
typedef struct SGroupResInfo {
int32_t rowId;
int32_t index;
SArray* pRows; // SArray<SResultRow*>
} SGroupResInfo;
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
......@@ -89,7 +88,6 @@ typedef struct SResultRowInfo {
int32_t size:24; // number of result set
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
} SResultRowInfo;
......
......@@ -67,7 +67,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto);
void tHistogramPrint(SHistogramInfo* pHisto);
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val);
int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val);
SHeapEntry* tHeapCreate(int32_t numOfEntries);
void tHeapSort(SHeapEntry* pEntry, int32_t len);
......
......@@ -77,7 +77,6 @@ void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p);
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv);
bool isPointInterpoQuery(SQuery *pQuery);
......
此差异已折叠。
......@@ -158,8 +158,8 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
}
#if defined(USE_ARRAYLIST)
int32_t idx = vnodeHistobinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
assert(idx >= 0 && idx <= (*pHisto)->maxEntries);
int32_t idx = histoBinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
assert(idx >= 0 && idx <= (*pHisto)->maxEntries && (*pHisto)->elems != NULL);
if ((*pHisto)->elems[idx].val == val && idx >= 0) {
(*pHisto)->elems[idx].num += 1;
......@@ -356,7 +356,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
return 0;
}
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val) {
int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val) {
int32_t end = len - 1;
int32_t start = 0;
......@@ -466,7 +466,7 @@ void tHistogramPrint(SHistogramInfo* pHisto) {
*/
int64_t tHistogramSum(SHistogramInfo* pHisto, double v) {
#if defined(USE_ARRAYLIST)
int32_t slotIdx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, v);
int32_t slotIdx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, v);
if (pHisto->elems[slotIdx].val != v) {
slotIdx -= 1;
......
......@@ -384,7 +384,12 @@ void tSqlSetColumnInfo(TAOS_FIELD *pField, SStrToken *pName, TAOS_FIELD *pType)
pField->name[pName->n] = 0;
pField->type = pType->type;
if(pField->type < TSDB_DATA_TYPE_BOOL || pField->type > TSDB_DATA_TYPE_NCHAR){
pField->bytes = 0;
} else {
pField->bytes = pType->bytes;
}
}
void tSqlSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
......
......@@ -96,8 +96,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo->curIndex = -1;
pResultRowInfo->size = 0;
pResultRowInfo->startTime = TSKEY_INITIAL_VAL;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
}
......@@ -110,7 +108,7 @@ void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
assert(num >= 0 && num <= numOfClosed);
int16_t type = pResultRowInfo->type;
int64_t uid = getResultInfoUId(pRuntimeEnv);
int64_t uid = 0;
char *key = NULL;
int16_t bytes = -1;
......@@ -181,11 +179,12 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pResultRowInfo->pResult[i]->closed) {
SResultRow* pRow = pResultRowInfo->pResult[i];
if (pRow->closed) {
continue;
}
pResultRowInfo->pResult[i]->closed = true;
pRow->closed = true;
}
}
......@@ -384,17 +383,3 @@ void* destroyResultRowPool(SResultRowPool* p) {
tfree(p);
return NULL;
}
\ No newline at end of file
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) {
if (!pRuntimeEnv->stableQuery) {
return 0; // for simple table query, the uid is always set to be 0;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) {
return 0;
}
STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable);
return id->uid;
}
\ No newline at end of file
......@@ -21,19 +21,19 @@ TEST(testCase, histogram_binary_search) {
pHisto->elems[i].val = i;
}
int32_t idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, 1);
int32_t idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, 1);
assert(idx == 1);
idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, 9);
idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, 9);
assert(idx == 9);
idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, 20);
idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, 20);
assert(idx == 10);
idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, -1);
idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, -1);
assert(idx == 0);
idx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, 3.9);
idx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, 3.9);
assert(idx == 4);
free(pHisto);
......
......@@ -576,7 +576,7 @@ static void rpcFreeMsg(void *msg) {
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
SRpcConn *pConn;
uint32_t peerIp = taosGetIpFromFqdn(peerFqdn);
uint32_t peerIp = taosGetIpv4FromFqdn(peerFqdn);
if (peerIp == 0xFFFFFFFF) {
tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
terrno = TSDB_CODE_RPC_FQDN_ERROR;
......
......@@ -38,7 +38,7 @@ extern "C" {
#define SYNC_MAX_FWDS 512
#define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 15000 // ms
#define SYNC_CHECK_INTERVAL 1 // ms
#define SYNC_CHECK_INTERVAL 1000 // ms
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
......@@ -86,9 +86,10 @@ typedef struct SsyncPeer {
int32_t peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried
int32_t fileChanged; // a flag to indicate file is changed during retrieving process
int32_t refCount;
int64_t rid;
void * timer;
void * pConn;
int32_t refCount; // reference count
struct SSyncNode *pSyncNode;
} SSyncPeer;
......@@ -98,6 +99,7 @@ typedef struct SSyncNode {
int8_t quorum;
int8_t selfIndex;
uint32_t vgId;
int32_t refCount;
int64_t rid;
SSyncPeer * peerInfo[TAOS_SYNC_MAX_REPLICA + 1]; // extra one for arbitrator
SSyncPeer * pMaster;
......@@ -121,13 +123,13 @@ extern int32_t tsSyncNum;
extern char tsNodeFqdn[TSDB_FQDN_LEN];
extern char * syncStatus[];
void *syncRetrieveData(void *param);
void *syncRestoreData(void *param);
void * syncRetrieveData(void *param);
void * syncRestoreData(void *param);
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer);
SSyncPeer *syncAcquirePeer(int64_t rid);
void syncReleasePeer(SSyncPeer *pPeer);
#ifdef __cplusplus
}
......
......@@ -25,14 +25,14 @@ typedef struct {
uint32_t serverIp;
int16_t port;
int32_t bufferSize;
void (*processBrokenLink)(void *ahandle);
int32_t (*processIncomingMsg)(void *ahandle, void *buffer);
void (*processBrokenLink)(int64_t handleId);
int32_t (*processIncomingMsg)(int64_t handleId, void *buffer);
void (*processIncomingConn)(int32_t fd, uint32_t ip);
} SPoolInfo;
void *syncOpenTcpThreadPool(SPoolInfo *pInfo);
void syncCloseTcpThreadPool(void *);
void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd);
void *syncAllocateTcpConn(void *, int64_t rid, int32_t connFd);
void syncFreeTcpConn(void *);
#ifdef __cplusplus
......
......@@ -29,8 +29,8 @@
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(void *param);
static int32_t arbProcessPeerMsg(void *param, void *buffer);
static void arbProcessBrokenLink(int64_t rid);
static int32_t arbProcessPeerMsg(int64_t rid, void *buffer);
static tsem_t tsArbSem;
static void * tsArbTcpPool;
......@@ -138,20 +138,20 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
sDebug("%s, arbitrator request is accepted", pNode->id);
pNode->nodeFd = connFd;
pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd);
pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, (int64_t)pNode, connFd);
return;
}
static void arbProcessBrokenLink(void *param) {
SNodeConn *pNode = param;
static void arbProcessBrokenLink(int64_t rid) {
SNodeConn *pNode = (SNodeConn *)rid;
sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno));
tfree(pNode);
}
static int32_t arbProcessPeerMsg(void *param, void *buffer) {
SNodeConn *pNode = param;
static int32_t arbProcessPeerMsg(int64_t rid, void *buffer) {
SNodeConn *pNode = (SNodeConn *)rid;
SSyncHead head;
int32_t bytes = 0;
char * cont = (char *)buffer;
......
此差异已折叠。
......@@ -90,15 +90,18 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
break;
}
sDebug("%s, file:%s info is received from master, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%d", pPeer->id,
minfo.name, minfo.index, minfo.size, minfo.fversion, minfo.magic);
// remove extra files on slave between the current and last index
syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1);
pindex = minfo.index;
// check the file info
sinfo = minfo;
sDebug("%s, get file:%s info size:%" PRId64, pPeer->id, minfo.name, minfo.size);
sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size,
&sinfo.fversion);
sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion);
sDebug("%s, local file:%s info, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%d", pPeer->id, sinfo.name,
sinfo.index, sinfo.size, sinfo.fversion, sinfo.magic);
// if file not there or magic is not the same, file shall be synced
memset(&fileAck, 0, sizeof(SFileAck));
......@@ -116,6 +119,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
if (fileAck.sync == 0) {
sDebug("%s, %s is the same", pPeer->id, minfo.name);
continue;
} else {
sDebug("%s, %s will be received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
}
// if sync is required, open file, receive from master, and write to file
......@@ -155,7 +160,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
return code;
}
static int32_t syncRestoreWal(SSyncPeer *pPeer) {
static int32_t syncRestoreWal(SSyncPeer *pPeer, uint64_t *wver) {
SSyncNode *pNode = pPeer->pSyncNode;
int32_t ret, code = -1;
uint64_t lastVer = 0;
......@@ -198,6 +203,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
}
free(pHead);
*wver = lastVer;
return code;
}
......@@ -321,12 +327,19 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
nodeVersion = fversion;
sInfo("%s, start to restore wal", pPeer->id);
if (syncRestoreWal(pPeer) < 0) {
sError("%s, failed to restore wal", pPeer->id);
sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion);
uint64_t wver = 0;
code = syncRestoreWal(pPeer, &wver); // lastwar
if (code < 0) {
sError("%s, failed to restore wal, code:%d", pPeer->id, code);
return -1;
}
if (wver != 0) {
nodeVersion = wver;
sDebug("%s, restore wal finished, set sver:%" PRIu64, pPeer->id, nodeVersion);
}
nodeSStatus = TAOS_SYNC_STATUS_CACHE;
sInfo("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
if (syncProcessBufferedFwd(pPeer) < 0) {
......@@ -338,7 +351,10 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
}
void *syncRestoreData(void *param) {
SSyncPeer *pPeer = param;
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return NULL;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE();
......@@ -369,7 +385,7 @@ void *syncRestoreData(void *param) {
taosClose(pPeer->syncFd);
syncCloseRecvBuffer(pNode);
__sync_fetch_and_sub(&tsSyncNum, 1);
syncDecPeerRef(pPeer);
syncReleasePeer(pPeer);
return NULL;
}
......@@ -104,7 +104,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion);
syncBuildFileInfo(&fileInfo, pNode->vgId);
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
sDebug("%s, file:%s info is sent, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%d", pPeer->id, fileInfo.name,
fileInfo.index, fileInfo.size, fileInfo.fversion, fileInfo.magic);
// send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo));
......@@ -144,6 +145,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.index++;
sDebug("%s, %s is the same", pPeer->id, fileInfo.name);
continue;
} else {
sDebug("%s, %s will be sent", pPeer->id, fileInfo.name);
}
// get the full path to file
......@@ -461,7 +464,10 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
}
void *syncRetrieveData(void *param) {
SSyncPeer *pPeer = (SSyncPeer *)param;
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return NULL;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE();
......@@ -490,7 +496,7 @@ void *syncRetrieveData(void *param) {
pPeer->fileChanged = 0;
taosClose(pPeer->syncFd);
syncDecPeerRef(pPeer);
syncReleasePeer(pPeer);
return NULL;
}
......@@ -42,7 +42,7 @@ typedef struct SPoolObj {
typedef struct {
SThreadObj *pThread;
void * ahandle;
int64_t handleId;
int32_t fd;
int32_t closedByApp;
} SConnObj;
......@@ -112,7 +112,7 @@ void syncCloseTcpThreadPool(void *param) {
tfree(pPool);
}
void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) {
struct epoll_event event;
SPoolObj *pPool = param;
......@@ -130,7 +130,7 @@ void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
pConn->fd = connFd;
pConn->pThread = pThread;
pConn->ahandle = pPeer;
pConn->handleId = rid;
pConn->closedByApp = 0;
event.events = EPOLLIN | EPOLLRDHUP;
......@@ -164,7 +164,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
SPoolInfo * pInfo = &pPool->info;
if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
(*pInfo->processBrokenLink)(pConn->ahandle);
(*pInfo->processBrokenLink)(pConn->handleId);
pThread->numOfFds--;
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
......@@ -221,7 +221,7 @@ static void *syncProcessTcpData(void *param) {
}
if (pConn->closedByApp == 0) {
if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) {
if ((*pInfo->processIncomingMsg)(pConn->handleId, buffer) < 0) {
syncFreeTcpConn(pConn);
continue;
}
......
......@@ -31,14 +31,14 @@
extern "C" {
#endif
extern int tsdbDebugFlag;
#define tsdbFatal(...) { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }}
#define tsdbError(...) { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }}
#define tsdbWarn(...) { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }}
#define tsdbInfo(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }}
#define tsdbDebug(...) { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }}
#define tsdbTrace(...) { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }}
extern uint32_t tsdbDebugFlag;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define TSDB_MAX_TABLE_SCHEMAS 16
#define TSDB_FILE_HEAD_SIZE 512
......
......@@ -161,6 +161,11 @@ _err:
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno);
SMemTable *pIMem = pRepo->imem;
tsdbLockRepo(pRepo);
pRepo->imem = NULL;
tsdbUnlockRepo(pRepo);
tsdbUnRefMemTable(pRepo, pIMem);
sem_post(&(pRepo->readyToCommit));
}
......
......@@ -17,6 +17,7 @@
#include "tsdbMain.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
......@@ -206,7 +207,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (pRepo->mem == NULL) return 0;
SMemTable *pIMem = pRepo->imem;
ASSERT(pRepo->imem == NULL);
sem_wait(&(pRepo->readyToCommit));
......@@ -221,8 +222,6 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
tsdbScheduleCommit(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
return 0;
}
......@@ -607,19 +606,13 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
STable * pTable = NULL;
SSubmitBlkIter blkIter = {0};
SDataRow row = NULL;
void ** rows = NULL;
void * rows[TSDB_MAX_INSERT_BATCH] = {0};
int rowCounter = 0;
ASSERT(pBlock->tid < pMeta->maxTables);
pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
rows = (void **)calloc(pBlock->numOfRows, sizeof(void *));
if (rows == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) {
......@@ -633,21 +626,28 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
if (rows[rowCounter] != NULL) {
rowCounter++;
}
}
if (rowCounter == TSDB_MAX_INSERT_BATCH) {
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
goto _err;
}
rowCounter = 0;
memset(rows, 0, sizeof(rows));
}
}
if (rowCounter > 0 && tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
goto _err;
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
free(rows);
return 0;
_err:
free(rows);
return -1;
}
......
此差异已折叠。
......@@ -52,6 +52,8 @@ void *taosIterateRef(int rsetId, int64_t rid);
// return the number of references in system
int taosListRef();
#define RID_VALID(x) ((x) > 0)
/* sample code to iterate the refs
void demoIterateRefs(int rsetId) {
......
......@@ -33,7 +33,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(SOCKET sockFd);
int32_t taosGetFqdn(char *);
uint32_t taosGetIpFromFqdn(const char *);
uint32_t taosGetIpv4FromFqdn(const char *);
void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr);
......
此差异已折叠。
......@@ -449,7 +449,7 @@ static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
int32_t endPort = startPort + 11;
uInfo("work as client, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
uint32_t serverIp = taosGetIpFromFqdn(host);
uint32_t serverIp = taosGetIpv4FromFqdn(host);
if (serverIp == 0xFFFFFFFF) {
uError("failed to resolve fqdn:%s", host);
exit(-1);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -155,7 +155,7 @@ int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = vparam;
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg
// dbCfgVersion can be corrected by status msg
if (!vnodeSetUpdatingStatus(pVnode)) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册