提交 b3574f29 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/feature/query' into feature/query

pipeline {
agent any
stages {
stage('build TDengine') {
steps {
sh '''cd ${WORKSPACE}
export TZ=Asia/Harbin
date
rm -rf ${WORKSPACE}/debug
mkdir debug
cd debug
cmake .. > /dev/null
make > /dev/null
cd ${WORKSPACE}/debug'''
}
}
stage('test_tsim') {
parallel {
stage('test') {
steps {
sh '''cd ${WORKSPACE}/tests
#./test-all.sh smoke
sudo ./test-all.sh full'''
}
}
stage('test_crash_gen') {
steps {
sh '''cd ${WORKSPACE}/tests/pytest
sudo ./crash_gen.sh -a -p -t 4 -s 2000'''
}
}
stage('test_valgrind') {
steps {
sh '''cd ${WORKSPACE}/tests/pytest
sudo ./valgrind-test.sh 2>&1 > mem-error-out.log
grep \'start to execute\\|ERROR SUMMARY\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-mem-error-out.log
for memError in `grep \'ERROR SUMMARY\' uniq-mem-error-out.log | awk \'{print $4}\'`
do
if [ -n "$memError" ]; then
if [ "$memError" -gt 12 ]; then
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\\
More than our threshold! ## ${NC}"
travis_terminate $memError
fi
fi
done
grep \'start to execute\\|definitely lost:\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-definitely-lost-out.log
for defiMemError in `grep \'definitely lost:\' uniq-definitely-lost-out.log | awk \'{print $7}\'`
do
if [ -n "$defiMemError" ]; then
if [ "$defiMemError" -gt 13 ]; then
echo -e "${RED} ## Memory errors number valgrind reports \\
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
travis_terminate $defiMemError
fi
fi
done'''
}
}
}
}
}
}
\ No newline at end of file
......@@ -72,18 +72,21 @@ ENDIF ()
IF (TD_ARM_32)
ADD_DEFINITIONS(-D_TD_ARM_32)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -Wno-pointer-to-int-cast -Wno-int-to-pointer-cast -Wno-incompatible-pointer-types ")
ENDIF ()
IF (TD_MIPS_64)
ADD_DEFINITIONS(-D_TD_MIPS_64_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "mips64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_MIPS_32)
ADD_DEFINITIONS(-D_TD_MIPS_32_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "mips32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
......
......@@ -68,7 +68,7 @@ systemctl status taosd
taos
```
如果TDengine终端链接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息出来(请参考[FAQ](https://www.taosdata.com/cn/faq/)来解决终端链接服务端失败的问题)。TDengine终端的提示符号如下:
如果TDengine终端连接服务成功,将会打印出欢迎消息和版本信息。如果失败,则会打印错误消息出来(请参考[FAQ](https://www.taosdata.com/cn/faq/)来解决终端连接服务端失败的问题)。TDengine终端的提示符号如下:
```cmd
taos>
......@@ -99,8 +99,8 @@ Query OK, 2 row(s) in set (0.001700s)
- -c, --config-dir: 指定配置文件目录,默认为_/etc/taos_
- -h, --host: 指定服务的IP地址,默认为本地服务
- -s, --commands: 在不进入终端的情况下运行TDengine命令
- -u, -- user: 接TDengine服务器的用户名,缺省为root
- -p, --password: 接TDengine服务器的密码,缺省为taosdata
- -u, -- user: 接TDengine服务器的用户名,缺省为root
- -p, --password: 接TDengine服务器的密码,缺省为taosdata
- -?, --help: 打印出所有命令行参数
示例:
......
......@@ -19,7 +19,7 @@ CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 4;
USE power;
```
就当前接里操作的库换为power,否则对具体表操作前,需要使用“库名.表名”来指定库的名字。
就当前接里操作的库换为power,否则对具体表操作前,需要使用“库名.表名”来指定库的名字。
**注意:**
......
......@@ -196,7 +196,7 @@ TDengine是基于硬件、软件系统不可靠、一定会有故障的假设进
**对外服务地址**:TDengine集群可以容纳单台、多台甚至几千台物理节点。应用只需要向集群中任何一个物理节点的publicIp发起连接即可。启动CLI应用taos时,选项-h需要提供的就是publicIp。
**master/secondIp**:每一个dnode都需要配置一个masterIp。dnode启动后,将对配置的masterIp发起加入集群的连接请求。masterIp是已经创建的集群中的任何一个节点的privateIp,对于集群中的第一个节点,就是它自己的privateIp。为保证连接成功,每个dnode还可配置secondIp, 该IP地址也是已创建的集群中的任何一个节点的privateIp。如果一个节点连接masterIp失败,它将试图接secondIp。
**master/secondIp**:每一个dnode都需要配置一个masterIp。dnode启动后,将对配置的masterIp发起加入集群的连接请求。masterIp是已经创建的集群中的任何一个节点的privateIp,对于集群中的第一个节点,就是它自己的privateIp。为保证连接成功,每个dnode还可配置secondIp, 该IP地址也是已创建的集群中的任何一个节点的privateIp。如果一个节点连接masterIp失败,它将试图接secondIp。
dnode启动后,会获知集群的mnode IP列表,并且定时向mnode发送状态信息。
......
......@@ -130,9 +130,25 @@ TDengine集群中加入一个新的dnode时,涉及集群相关的一些参数
- statusInterval: dnode向mnode报告状态时长。单位为秒,默认值:1。
- maxTablesPerVnode: 每个vnode中能够创建的最大表个数。默认值:1000000。
- maxVgroupsPerDb: 每个数据库中能够使用的最大vnode个数。
- arbitrator: 系统中裁决器的end point,缺省为空
- arbitrator: 系统中裁决器的end point,缺省为空
- timezone、locale、charset 的配置见客户端配置。
为方便调试,可通过SQL语句临时调整每个dnode的日志配置,系统重启后会失效:
```mysql
ALTER DNODE <dnode_id> <config>
```
- dnode_id: 可以通过SQL语句"SHOW DNODES"命令获取
- config: 要调整的日志参数,在如下列表中取值
> resetlog 截断旧日志文件,创建一个新日志文件
> debugFlag < 131 | 135 | 143 > 设置debugFlag为131、135或者143
例如:
```
alter dnode 1 debugFlag 135;
```
## 客户端配置
TDengine系统的前台交互客户端应用程序为taos,它与taosd共享同一个配置文件taos.cfg。运行taos时,使用参数-c指定配置文件目录,如taos -c /home/cfg,表示使用/home/cfg/目录下的taos.cfg配置文件中的参数,缺省目录是/etc/taos。本节主要说明 taos 客户端应用在配置文件 taos.cfg 文件中使用到的参数。
......@@ -234,10 +250,10 @@ ALTER USER <user_name> PASS <'password'>;
修改用户密码, 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
```
ALTER USER <user_name> PRIVILEDGE <'super'|'write'|'read'>;
ALTER USER <user_name> PRIVILEDGE <super|write|read>;
```
修改用户权限为:super/write/read。 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
修改用户权限为:super/write/read,不需要添加单引号
```
SHOW USERS;
......@@ -392,5 +408,5 @@ TDengine的所有可执行文件默认存放在 _/usr/local/taos/bin_ 目录下
您可以通过修改系统配置文件taos.cfg来配置不同的数据目录和日志目录。
##
......@@ -84,17 +84,17 @@ TDengine 分布式架构的逻辑结构图如下:
**FQDN配置**:一个数据节点有一个或多个FQDN,可以在系统配置文件taos.cfg通过参数“fqdn"进行指定,如果没有指定,系统将自动获取FQDN。如果节点没有配置FQDN,可以直接将该节点的配置参数fqdn设置为它的IP地址。但不建议使用IP,因为IP地址可变,一旦变化,将让集群无法正常工作。一个数据节点的EP(End Point)由FQDN + Port组成。采用FQDN,需要保证DNS服务正常工作,或者在节点以及应用所在的节点配置好hosts文件。
**端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定,对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口,是serverPort+10. 为支持多线程高效的处理UDP数据,每个对内和对外的UDP接,都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10,总共11个TCP/UDP端口。使用时,需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。
**端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定,对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口,是serverPort+10. 为支持多线程高效的处理UDP数据,每个对内和对外的UDP接,都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10,总共11个TCP/UDP端口。使用时,需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。
**集群对外链接:** TDengine集群可以容纳单个、多个甚至几千个数据节点。应用只需要向集群中任何一个数据节点发起连接即可,链接需要提供的网络参数是一数据节点的End Point(FQDN加配置的端口号)。通过命令行CLI启动应用taos时,可以通过选项-h来指定数据节点的FQDN, -P来指定其配置的端口号,如果端口不配置,将采用TDengine的系统配置参数serverPort。
**集群对外连接:** TDengine集群可以容纳单个、多个甚至几千个数据节点。应用只需要向集群中任何一个数据节点发起连接即可,连接需要提供的网络参数是一数据节点的End Point(FQDN加配置的端口号)。通过命令行CLI启动应用taos时,可以通过选项-h来指定数据节点的FQDN, -P来指定其配置的端口号,如果端口不配置,将采用TDengine的系统配置参数serverPort。
**集群内部通讯**: 各个数据节点之间通过TCP/UDP进行链接。一个数据节点启动时,将获取mnode所在的dnode的EP信息,然后与系统中的mnode建立起链接,交换信息。获取mnode的EP信息有三步,1:检查mnodeEpList文件是否存在,如果不存在或不能正常打开获得mnode EP信息,进入第二步;2:检查系统配置文件taos.cfg, 获取mnode EP配置参数first, second,如果不存在或者taos.cfg里没有这两个配置参数,或无效,进入第三步;3:将自己的EP设为mnode EP, 并独立运行起来。获取mnode EP列表后,数据节点发起链接,如果链接成功,则成功加入进工作的集群,如果不成功,则尝试mnode EP列表中的下一个。如果都尝试了,但链接都仍然失败,则休眠几秒后,再进行尝试。
**集群内部通讯**: 各个数据节点之间通过TCP/UDP进行连接。一个数据节点启动时,将获取mnode所在的dnode的EP信息,然后与系统中的mnode建立起连接,交换信息。获取mnode的EP信息有三步,1:检查mnodeEpList文件是否存在,如果不存在或不能正常打开获得mnode EP信息,进入第二步;2:检查系统配置文件taos.cfg, 获取mnode EP配置参数first, second,如果不存在或者taos.cfg里没有这两个配置参数,或无效,进入第三步;3:将自己的EP设为mnode EP, 并独立运行起来。获取mnode EP列表后,数据节点发起连接,如果连接成功,则成功加入进工作的集群,如果不成功,则尝试mnode EP列表中的下一个。如果都尝试了,但连接都仍然失败,则休眠几秒后,再进行尝试。
**MNODE的选择:** TDengine逻辑上有管理节点,但没有单独的执行代码,服务器侧只有一套执行代码taosd。那么哪个数据节点会是管理节点呢?这是系统自动决定的,无需任何人工干预。原则如下:一个数据节点启动时,会检查自己的End Point, 并与获取的mnode EP List进行比对,如果在其中,该数据节点认为自己应该启动mnode模块,成为mnode。如果自己的EP不在mnode EP List里,则不启动mnode模块。在系统的运行过程中,由于负载均衡、宕机等原因,mnode有可能迁移至新的dnode,但一切都是透明的,无需人工干预,配置参数的修改,是mnode自己根据资源做出的决定。
**新数据节点的加入**:系统有了一个数据节点后,就已经成为一个工作的系统。添加新的节点进集群时,有两个步骤,第一步:使用TDengine CLI接到现有工作的数据节点,然后用命令”create dnode"将新的数据节点的End Point添加进去; 第二步:在新的数据节点的系统配置参数文件taos.cfg里,将first, second参数设置为现有集群中任意两个数据节点的EP即可。具体添加的详细步骤请见详细的用户手册。这样就把集群一步一步的建立起来。
**新数据节点的加入**:系统有了一个数据节点后,就已经成为一个工作的系统。添加新的节点进集群时,有两个步骤,第一步:使用TDengine CLI接到现有工作的数据节点,然后用命令”create dnode"将新的数据节点的End Point添加进去; 第二步:在新的数据节点的系统配置参数文件taos.cfg里,将first, second参数设置为现有集群中任意两个数据节点的EP即可。具体添加的详细步骤请见详细的用户手册。这样就把集群一步一步的建立起来。
**重定向**:无论是dnode还是taosc,最先都是要发起与mnode的链接,但mnode是系统自动创建并维护的,因此对于用户来说,并不知道哪个dnode在运行mnode。TDengine只要求向系统中任何一个工作的dnode发起链接即可。因为任何一个正在运行的dnode,都维护有目前运行的mnode EP List。当收到一个来自新启动的dnode或taosc的链接请求,如果自己不是mnode,则将mnode EP List回复给对方,taosc或新启动的dnode收到这个list, 就重新尝试建立链接。当mnode EP List发生改变,通过节点之间的消息交互,各个数据节点就很快获取最新列表,并通知taosc。
**重定向**:无论是dnode还是taosc,最先都是要发起与mnode的连接,但mnode是系统自动创建并维护的,因此对于用户来说,并不知道哪个dnode在运行mnode。TDengine只要求向系统中任何一个工作的dnode发起连接即可。因为任何一个正在运行的dnode,都维护有目前运行的mnode EP List。当收到一个来自新启动的dnode或taosc的连接请求,如果自己不是mnode,则将mnode EP List回复给对方,taosc或新启动的dnode收到这个list, 就重新尝试建立连接。当mnode EP List发生改变,通过节点之间的消息交互,各个数据节点就很快获取最新列表,并通知taosc。
### 一个典型的消息流程
为解释vnode, mnode, taosc和应用之间的关系以及各自扮演的角色,下面对写入数据这个典型操作的流程进行剖析。
......@@ -197,7 +197,7 @@ Master Vnode遵循下面的写入流程:
### 主从选择
Vnode会保持一个数据版本号(Version),对内存数据进行持久化存储时,对该版本号也进行持久化存储。每个数据更新操作,无论是采集的时序数据还是元数据,这个版本号将增一。
一个vnode启动时,角色(master、slave) 是不定的,数据是处于未同步状态,它需要与虚拟节点组内其他节点建立TCP接,并互相交换status,其中包括version和自己的角色。通过status的交换,系统进入选主流程,规则如下:
一个vnode启动时,角色(master、slave) 是不定的,数据是处于未同步状态,它需要与虚拟节点组内其他节点建立TCP接,并互相交换status,其中包括version和自己的角色。通过status的交换,系统进入选主流程,规则如下:
1. 如果只有一个副本,该副本永远就是master
2. 所有副本都在线时,版本最高的被选为master
......
......@@ -142,7 +142,7 @@ C/C++的API类似于MySQL的C API。应用程序使用时,需要包含TDengine
获取最近一次API调用失败的原因,返回值为错误代码。
**注意**:对于单个数据库连接,在同一时刻只能有一个线程使用该接调用API,否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理。
**注意**:对于单个数据库连接,在同一时刻只能有一个线程使用该接调用API,否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理。
### 异步查询API
......
......@@ -21,7 +21,7 @@
## 5. 遇到错误"Unable to establish connection", 我怎么办?
客户端遇到接故障,请按照下面的步骤进行检查:
客户端遇到接故障,请按照下面的步骤进行检查:
1. 检查网络环境
* 云服务器:检查云服务器的安全组是否打开TCP/UDP 端口6030-6042的访问权限
......@@ -45,7 +45,7 @@
9. 如果仍不能排除连接故障,请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅
检查UDP端口连接是否工作:`nc -vuz {hostIP} {port} `
检查服务器侧TCP端口连接是否工作:`nc -l {port}`
检查客户端侧TCP端口接是否工作:`nc {hostIP} {port}`
检查客户端侧TCP端口接是否工作:`nc {hostIP} {port}`
10. 也可以使用taos程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅(包括TCP和UDP):[TDengine 内嵌网络检测工具使用指南](https://www.taosdata.com/blog/2020/09/08/1816.html)
......@@ -57,7 +57,7 @@
1. 请检查连接的服务器的FQDN是否正确,FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)
2. 如果网络配置有DNS server, 请检查是否正常工作
3. 如果网络没有配置DNS server, 请检查客户端所在机器的hosts文件,查看该FQDN是否配置,并是否有正确的IP地址。
4. 如果网络配置OK,从客户端所在机器,你需要能Ping该连接的FQDN,否则客户端是无法接服务器的
4. 如果网络配置OK,从客户端所在机器,你需要能Ping该连接的FQDN,否则客户端是无法接服务器的
## 7. 虽然语法正确,为什么我还是得到 "Invalid SQL" 错误
......@@ -108,4 +108,8 @@ Connection = DriverManager.getConnection(url, properties);
附上必要的问题描述,以及发生该问题的执行操作,出现问题的表征及大概的时间,在<a href='https://github.com/taosdata/TDengine'> GitHub</a>提交Issue。
为了保证有足够的debug信息,如果问题能够重复,请修改/etc/taos/taos.cfg文件,最后面添加一行“debugFlag 135"(不带引号本身),然后重启taosd, 重复问题,然后再递交。但系统正常运行时,请一定将debugFlag设置为131,否则会产生大量的日志信息,降低系统效率。
为了保证有足够的debug信息,如果问题能够重复,请修改/etc/taos/taos.cfg文件,最后面添加一行“debugFlag 135"(不带引号本身),然后重启taosd, 重复问题,然后再递交。也可以通过如下SQL语句,临时设置taosd的日志级别。
```
alter dnode <dnode_id> debugFlag 135;
```
但系统正常运行时,请一定将debugFlag设置为131,否则会产生大量的日志信息,降低系统效率。
......@@ -66,21 +66,21 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
数据实时复制有三个主要流程:选主、数据转发、数据恢复。后续做详细讨论。
## 虚拟节点之间的网络
## 虚拟节点之间的网络
虚拟节点之间通过TCP进行链接,节点之间的状态交换、数据包的转发都是通过这个TCP链接(peerFd)进行。为避免竞争,两个虚拟节点之间的TCP链接,总是由IP地址(UINT32)小的节点作为TCP客户端发起。一旦TCP链接被中断,虚拟节点能通过TCP socket自动检测到,将对方标为offline。如果监测到任何错误(比如数据恢复流程),虚拟节点将主动重置该链接。
虚拟节点之间通过TCP进行连接,节点之间的状态交换、数据包的转发都是通过这个TCP连接(peerFd)进行。为避免竞争,两个虚拟节点之间的TCP连接,总是由IP地址(UINT32)小的节点作为TCP客户端发起。一旦TCP连接被中断,虚拟节点能通过TCP socket自动检测到,将对方标为offline。如果监测到任何错误(比如数据恢复流程),虚拟节点将主动重置该连接。
一旦作为客户端的节点链接不成或中断,它将周期性的每隔一秒钟去试图去链接一次。因为TCP本身有心跳机制,虚拟节点之间不再另行提供心跳。
一旦作为客户端的节点连接不成或中断,它将周期性的每隔一秒钟去试图去连接一次。因为TCP本身有心跳机制,虚拟节点之间不再另行提供心跳。
如果一个unsynced节点要发起数据恢复流程,它与Master将建立起专有的TCP链接(syncFd)。数据恢复完成后,该链接会被关闭。而且为限制资源的使用,系统只容许一定数量(配置参数tsMaxSyncNum)的数据恢复的socket存在。如果超过这个数字,系统会将新的数据恢复请求延后处理。
如果一个unsynced节点要发起数据恢复流程,它与Master将建立起专有的TCP连接(syncFd)。数据恢复完成后,该连接会被关闭。而且为限制资源的使用,系统只容许一定数量(配置参数tsMaxSyncNum)的数据恢复的socket存在。如果超过这个数字,系统会将新的数据恢复请求延后处理。
任意一个节点,无论有多少虚拟节点,都会启动而且只会启动一个TCP server, 来接受来自其他虚拟节点的上述两类TCP的链接请求。当TCP socket建立起来,客户端侧发送的消息体里会带有vgId(全局唯一的vgroup ID), TCP 服务器侧会检查该vgId是否已经在该节点启动运行。如果已经启动运行,就接受其请求。如果不存在,就直接将链接请求关闭。在TDengine代码里,mnode group的vgId设置为1。
任意一个节点,无论有多少虚拟节点,都会启动而且只会启动一个TCP server, 来接受来自其他虚拟节点的上述两类TCP的连接请求。当TCP socket建立起来,客户端侧发送的消息体里会带有vgId(全局唯一的vgroup ID), TCP 服务器侧会检查该vgId是否已经在该节点启动运行。如果已经启动运行,就接受其请求。如果不存在,就直接将连接请求关闭。在TDengine代码里,mnode group的vgId设置为1。
## 选主流程
当同一组的两个虚拟节点之间(vnode A, vnode B)建立连接后,他们互换status消息。status消息里包含本地存储的同一虚拟节点组内所有虚拟节点的role和version。
如果一个虚拟节点(vnode A)检测到与同一虚拟节点组内另外一虚拟节点(vnode B)的链接中断,vnode A将立即把vnode B的role设置为offline。无论是接收到另外一虚拟节点发来的status消息,还是检测与另外一虚拟节点的链接中断,该虚拟节点都将进入状态处理流程。状态处理流程的规则如下:
如果一个虚拟节点(vnode A)检测到与同一虚拟节点组内另外一虚拟节点(vnode B)的连接中断,vnode A将立即把vnode B的role设置为offline。无论是接收到另外一虚拟节点发来的status消息,还是检测与另外一虚拟节点的连接中断,该虚拟节点都将进入状态处理流程。状态处理流程的规则如下:
1. 如果检测到在线的节点数没有超过一半,则将自己的状态设置为unsynced.
2. 如果在线的虚拟节点数超过一半,会检查master节点是否存在,如果存在,则会决定是否将自己状态改为slave或启动数据恢复流程
......@@ -118,7 +118,7 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
9. 如果quorum为1,上述6,7,8步不会发生。
10. 如果要等待slave的确认,master会启动2秒的定时器(可配置),如果超时,则认为失败。
对于回复确认,sync模块提供的是异步回调函数,因此APP在调用syncForwardToPeer之后,无需等待,可以处理下一个操作。在Master与Slave的TCP接管道里,可能有多个Forward消息,这些消息是严格按照应用提供的顺序排好的。对于Forward Response也是一样,TCP管道里存在多个,但都是排序好的。这个顺序,SYNC模块并没有做特别的事情,是由APP单线程顺序写来保证的(TDengine里每个vnode的写数据,都是单线程)。
对于回复确认,sync模块提供的是异步回调函数,因此APP在调用syncForwardToPeer之后,无需等待,可以处理下一个操作。在Master与Slave的TCP接管道里,可能有多个Forward消息,这些消息是严格按照应用提供的顺序排好的。对于Forward Response也是一样,TCP管道里存在多个,但都是排序好的。这个顺序,SYNC模块并没有做特别的事情,是由APP单线程顺序写来保证的(TDengine里每个vnode的写数据,都是单线程)。
## 数据恢复流程
......@@ -142,9 +142,9 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
<center> <img src="../assets/replica-restore.png"> </center>
1. 通过已经建立的TCP接,发送sync req给master节点
2. master收到sync req后,以client的身份,向vnode B主动建立一新的专用于同步的TCP接(syncFd)
3. 新的TCP接建立成功后,master将开始retrieve流程,对应的,vnode B将同步启动restore流程
1. 通过已经建立的TCP接,发送sync req给master节点
2. master收到sync req后,以client的身份,向vnode B主动建立一新的专用于同步的TCP接(syncFd)
3. 新的TCP接建立成功后,master将开始retrieve流程,对应的,vnode B将同步启动restore流程
4. Retrieve/Restore流程里,先处理所有archived data (vnode里的data, head, last文件),后处理WAL data。
5. 对于archived data,master将通过回调函数getFileInfo获取数据文件的基本信息,包括文件名、magic以及文件大小。
6. master 将获得的文件名、magic以及文件大小发给vnode B
......@@ -157,7 +157,7 @@ TDengine采取的是Master-Slave模式进行同步,与流行的RAFT一致性
1. master节点调用回调函数getWalInfo,获取WAL的文件名。
2. 如果getWalInfo返回值大于0,表示该文件还不是最后一个WAL,因此master调用sendfile一下把该文件发送给vnode B
3. 如果getWalInfo返回时为0,表示该文件是最后一个WAL,因为文件可能还处于写的状态中,sync模块要根据WAL Head的定义逐条读出记录,然后发往vnode B。
4. vnode A读取TCP接传来的数据,按照WAL Head,逐条读取,如果版本号比现有的大,调用回调函数writeToCache,交给应用处理。如果小,直接扔掉。
4. vnode A读取TCP接传来的数据,按照WAL Head,逐条读取,如果版本号比现有的大,调用回调函数writeToCache,交给应用处理。如果小,直接扔掉。
5. 上述流程循环,直到所有WAL文件都被处理完。处理完后,master就会将新来的数据包通过Forward消息转发给slave。
从同步文件启动起,sync模块会通过inotify监控所有处理过的file以及wal。一旦发现被处理过的文件有更新变化,同步流程将中止,会重新启动。因为有可能落盘操作正在进行(比如历史数据导入,内存数据落盘),把已经处理过的文件进行了修改,需要重新同步才行。
......@@ -194,15 +194,15 @@ sync模块通过inotify监控LastWal文件的更新和关闭操作。而且在
因为写入失败,客户端会重新写入数据。但对于TDengine而言,是OK的。因为时序数据都是有时间戳的,时间戳相同的数据更新操作,第一次会执行,但第二次会自动扔掉。对于Meta Data(增加、删除库、表等等)的操作,也是OK的。一张表、库已经被创建或删除,再创建或删除,不会被执行的。
在TDengine的设计里,虚拟节点与虚拟节点之间,是一个TCP链接,是一个pipeline,数据块一个接一个按顺序在这个pipeline里等待处理。一旦某个数据块的处理失败,这个链接会被重置,后续的数据块的处理都会失败。因此不会存在Pipeline里一个数据块更新失败,但下一个数据块成功的可能。
在TDengine的设计里,虚拟节点与虚拟节点之间,是一个TCP连接,是一个pipeline,数据块一个接一个按顺序在这个pipeline里等待处理。一旦某个数据块的处理失败,这个连接会被重置,后续的数据块的处理都会失败。因此不会存在Pipeline里一个数据块更新失败,但下一个数据块成功的可能。
## Split Brain的问题
选举流程中,有个强制要求,那就是一定有超过半数的虚拟节点在线。但是如果replication正好是偶数,这个时候,完全可能存在splt brain问题。
为解决这个问题,TDengine提供Arbitrator的解决方法。Arbitrator是一个节点,它的任务就是接受任何虚拟节点的接请求,并保持它。
为解决这个问题,TDengine提供Arbitrator的解决方法。Arbitrator是一个节点,它的任务就是接受任何虚拟节点的接请求,并保持它。
在启动复制模块实例时,在配置参数中,应用可以提供Arbitrator的IP地址。如果是奇数个副本,复制模块不会与这个arbitrator去建立链接,但如果是偶数个副本,就会主动去建立链接。
在启动复制模块实例时,在配置参数中,应用可以提供Arbitrator的IP地址。如果是奇数个副本,复制模块不会与这个arbitrator去建立连接,但如果是偶数个副本,就会主动去建立连接。
Arbitrator的程序tarbitrator.c在复制模块的同一目录, 编译整个系统时,会在bin目录生成。命令行参数“-?”查看可以配置的参数,比如绑定的IP地址,监听的端口号。
......
......@@ -13,7 +13,7 @@ taosd的启动入口是dnode模块,dnode然后启动其他模块,包括可
该模块负责taosd与taosc, 以及其他数据节点之间的通讯。TDengine没有采取标准的HTTP或gRPC等第三方工具,而是实现了自己的通讯模块RPC。
考虑到物联网场景下,数据写入的包一般不大,因此除支持TCP链接之外,RPC还支持UDP链接。当数据包小于15K时,RPC将采用UDP方式进行链接,否则将采用TCP链接。对于查询类的消息,RPC不管包的大小,总是采取TCP链接。对于UDP链接,RPC实现了自己的超时、重传、顺序检查等机制,以保证数据可靠传输。
考虑到物联网场景下,数据写入的包一般不大,因此除支持TCP连接之外,RPC还支持UDP连接。当数据包小于15K时,RPC将采用UDP方式进行连接,否则将采用TCP连接。对于查询类的消息,RPC不管包的大小,总是采取TCP连接。对于UDP连接,RPC实现了自己的超时、重传、顺序检查等机制,以保证数据可靠传输。
RPC模块还提供数据压缩功能,如果数据包的字节数超过系统配置参数compressMsgSize, RPC在传输中将自动压缩数据,以节省带宽。
......@@ -25,7 +25,7 @@ RPC模块还提供数据压缩功能,如果数据包的字节数超过系统
- 系统的初始化,包括
- 从文件taos.cfg读取系统配置参数,从文件dnodeCfg.json读取数据节点的配置参数;
- 启动RPC模块,并建立起与taosc通讯的server链接,与其他数据节点通讯的server链接;
- 启动RPC模块,并建立起与taosc通讯的server连接,与其他数据节点通讯的server连接;
- 启动并初始化dnode的内部管理, 该模块将扫描该数据节点已有的vnode,并打开它们;
- 初始化可配置的模块,如mnode, http, monitor等。
- 数据节点的管理,包括
......
......@@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/TDengine-enterprise-arbitrator"
install_dir="${release_dir}/TDengine-enterprise-arbitrator-${version}"
else
install_dir="${release_dir}/TDengine-arbitrator"
install_dir="${release_dir}/TDengine-arbitrator-${version}"
fi
# Directories and files.
......@@ -48,9 +48,9 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1
......
......@@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/PowerDB-enterprise-arbitrator"
install_dir="${release_dir}/PowerDB-enterprise-arbitrator-${version}"
else
install_dir="${release_dir}/PowerDB-arbitrator"
install_dir="${release_dir}/PowerDB-arbitrator-${version}"
fi
# Directories and files.
......@@ -48,9 +48,9 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1
......
......@@ -32,9 +32,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/TDengine-enterprise-client"
install_dir="${release_dir}/TDengine-enterprise-client-${version}"
else
install_dir="${release_dir}/TDengine-client"
install_dir="${release_dir}/TDengine-client-${version}"
fi
# Directories and files.
......@@ -125,9 +125,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1
......
......@@ -32,9 +32,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/PowerDB-enterprise-client"
install_dir="${release_dir}/PowerDB-enterprise-client-${version}"
else
install_dir="${release_dir}/PowerDB-client"
install_dir="${release_dir}/PowerDB-client-${version}"
fi
# Directories and files.
......@@ -164,9 +164,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1
......
......@@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/TDengine-enterprise-server"
install_dir="${release_dir}/TDengine-enterprise-server-${version}"
else
install_dir="${release_dir}/TDengine-server"
install_dir="${release_dir}/TDengine-server-${version}"
fi
# Directories and files.
......@@ -138,9 +138,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1
......
......@@ -25,9 +25,9 @@ release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/PowerDB-enterprise-server"
install_dir="${release_dir}/PowerDB-enterprise-server-${version}"
else
install_dir="${release_dir}/PowerDB-server"
install_dir="${release_dir}/PowerDB-server-${version}"
fi
# Directories and files.
......@@ -184,9 +184,9 @@ fi
cd ${release_dir}
if [ "$verMode" == "cluster" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
elif [ "$verMode" == "edge" ]; then
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
pkg_name=${install_dir}-${osType}-${cpuType}
else
echo "unknow verMode, nor cluster or edge"
exit 1
......
......@@ -490,13 +490,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetBooleanFp, i, (jboolean)(*((char *)row[i]) == 1));
break;
case TSDB_DATA_TYPE_TINYINT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteFp, i, (jbyte) * ((char *)row[i]));
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteFp, i, (jbyte) * ((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetShortFp, i, (jshort) * ((short *)row[i]));
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetShortFp, i, (jshort) * ((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetIntFp, i, (jint) * (int *)row[i]);
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetIntFp, i, (jint) * (int32_t *)row[i]);
break;
case TSDB_DATA_TYPE_BIGINT:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetLongFp, i, (jlong) * ((int64_t *)row[i]));
......
......@@ -525,7 +525,7 @@ static void do_sum(SQLFunctionCtx *pCtx) {
*retVal += pCtx->preAggVals.statis.sum;
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
double *retVal = (double*) pCtx->aOutputBuf;
*retVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.sum));
*retVal += GET_DOUBLE_VAL((const char*)&(pCtx->preAggVals.statis.sum));
}
} else { // computing based on the true data block
void *pData = GET_INPUT_CHAR(pCtx);
......@@ -768,7 +768,7 @@ static void avg_function(SQLFunctionCtx *pCtx) {
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
*pVal += pCtx->preAggVals.statis.sum;
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
*pVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.sum));
*pVal += GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.sum));
}
} else {
void *pData = GET_INPUT_CHAR(pCtx);
......@@ -3516,12 +3516,12 @@ static void spread_function(SQLFunctionCtx *pCtx) {
pInfo->max = (double)pCtx->preAggVals.statis.max;
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
if (pInfo->min > GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.min))) {
pInfo->min = GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.min));
if (pInfo->min > GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.min))) {
pInfo->min = GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.min));
}
if (pInfo->max < GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.max))) {
pInfo->max = GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.max));
if (pInfo->max < GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.max))) {
pInfo->max = GET_DOUBLE_VAL((const char *)&(pCtx->preAggVals.statis.max));
}
}
......
......@@ -306,16 +306,16 @@ static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengt
switch (type) {
case TSDB_DATA_TYPE_BOOL:
sprintf(result, "%s", ((((int)(*((char *)val))) == 1) ? "true" : "false"));
sprintf(result, "%s", ((((int32_t)(*((char *)val))) == 1) ? "true" : "false"));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(result, "%d", (int)(*((char *)val)));
sprintf(result, "%d", *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(result, "%d", (int)(*((short *)val)));
sprintf(result, "%d", *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
sprintf(result, "%d", *((int *)val));
sprintf(result, "%d", *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(result, "%"PRId64, *((int64_t *)val));
......
......@@ -1688,7 +1688,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (pItem->pNode->pParam != NULL) {
tSQLExprItem* pParamElem = &pItem->pNode->pParam->a[0];
SStrToken* pToken = &pParamElem->pNode->colInfo;
short sqlOptr = pParamElem->pNode->nSQLOptr;
int16_t sqlOptr = pParamElem->pNode->nSQLOptr;
if ((pToken->z == NULL || pToken->n == 0)
&& (TK_INTEGER != sqlOptr)) /*select count(1) from table*/ {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
......
......@@ -245,9 +245,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
uint64_t handle = (uint64_t) rpcMsg->ahandle;
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(uint64_t));
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) {
rpcFreeCont(rpcMsg->pCont);
return;
......
......@@ -720,15 +720,15 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
len += sprintf(str + len, "%d", *((char *)row[i]));
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d", *((short *)row[i]));
len += sprintf(str + len, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d", *((int *)row[i]));
len += sprintf(str + len, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
......
......@@ -141,7 +141,7 @@ void taos_init_imp(void) {
int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj");
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeSqlObjInCache, "sqlObj");
}
tscDebug("client is initialized successfully");
......
......@@ -1788,8 +1788,8 @@ void registerSqlObj(SSqlObj* pSql) {
int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
uint64_t p = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME);
TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME);
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
......
......@@ -219,7 +219,7 @@ static void getStatics_f(const TSKEY *primaryKey, const void *pData, int32_t num
}
float fv = 0;
fv = GET_FLOAT_VAL(&(data[i]));
fv = GET_FLOAT_VAL((const char*)&(data[i]));
dsum += fv;
if (fmin > fv) {
fmin = fv;
......@@ -233,17 +233,12 @@ static void getStatics_f(const TSKEY *primaryKey, const void *pData, int32_t num
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum = GET_DOUBLE_VAL((const char *)sum);
csum += dsum;
#ifdef _TD_ARM_32
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &fmax);
SET_DOUBLE_VAL_ALIGN(min, &fmin);
#else
*(double*)sum = csum;
*(double*)max = fmax;
*(double*)min = fmin;
#endif
SET_DOUBLE_VAL(sum, csum);
SET_DOUBLE_VAL(max, fmax);
SET_DOUBLE_VAL(min, fmin);
}
static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
......@@ -264,7 +259,7 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num
}
double dv = 0;
dv = GET_DOUBLE_VAL(&(data[i]));
dv = GET_DOUBLE_VAL((const char*)&(data[i]));
dsum += dv;
if (dmin > dv) {
dmin = dv;
......@@ -278,19 +273,12 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num
}
double csum = 0;
csum = GET_DOUBLE_VAL(sum);
csum = GET_DOUBLE_VAL((const char *)sum);
csum += dsum;
#ifdef _TD_ARM_32
SET_DOUBLE_VAL_ALIGN(sum, &csum);
SET_DOUBLE_VAL_ALIGN(max, &dmax);
SET_DOUBLE_VAL_ALIGN(min, &dmin);
#else
*(double*) sum = csum;
*(double*) max = dmax;
*(double*) min = dmin;
#endif
SET_DOUBLE_PTR(sum, &csum);
SET_DOUBLE_PTR(max, &dmax);
SET_DOUBLE_PTR(min, &dmin);
}
static void getStatics_bin(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
......@@ -493,46 +481,29 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
*((int32_t *)val) = GET_INT32_VAL(src);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
#ifdef _TD_ARM_32
float fv = GET_FLOAT_VAL(src);
SET_FLOAT_VAL_ALIGN(val, &fv);
#else
*((float *)val) = GET_FLOAT_VAL(src);
#endif
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_VAL(val, GET_FLOAT_VAL(src));
break;
};
case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32
double dv = GET_DOUBLE_VAL(src);
SET_DOUBLE_VAL_ALIGN(val, &dv);
#else
*((double *)val) = GET_DOUBLE_VAL(src);
#endif
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(val, GET_DOUBLE_VAL(src));
break;
};
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)val) = GET_INT64_VAL(src);
break;
};
case TSDB_DATA_TYPE_SMALLINT: {
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)val) = GET_INT16_VAL(src);
break;
};
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)val) = GET_INT8_VAL(src);
break;
};
case TSDB_DATA_TYPE_BINARY: {
case TSDB_DATA_TYPE_BINARY:
varDataCopy(val, src);
break;
};
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:
varDataCopy(val, src);
break;
};
default: {
memcpy(val, src, len);
break;
......
......@@ -709,46 +709,21 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
return -1;
}
#ifdef _TD_ARM_32
//memcpy(&payload, &value, sizeof(float));
float fv = (float)value;
SET_FLOAT_VAL_ALIGN(payload, &fv);
#else
*((float *)payload) = (float)value;
#endif
SET_FLOAT_VAL(payload, value);
}
} else if (pVariant->nType >= TSDB_DATA_TYPE_BOOL && pVariant->nType <= TSDB_DATA_TYPE_BIGINT) {
#ifdef _TD_ARM_32
//memcpy(&payload, &pVariant->i64Key, sizeof(float));
float fv = (float)pVariant->i64Key;
SET_FLOAT_VAL_ALIGN(payload, &fv);
#else
*((float *)payload) = (float)pVariant->i64Key;
#endif
SET_FLOAT_VAL(payload, pVariant->i64Key);
} else if (pVariant->nType == TSDB_DATA_TYPE_DOUBLE || pVariant->nType == TSDB_DATA_TYPE_FLOAT) {
#ifdef _TD_ARM_32
//memcpy(&payload, &pVariant->dKey, sizeof(float));
float fv = (float)pVariant->dKey;
SET_FLOAT_VAL_ALIGN(payload, &fv);
#else
*((float *)payload) = (float)pVariant->dKey;
#endif
SET_FLOAT_VAL(payload, pVariant->dKey);
} else if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
return 0;
}
#ifdef _TD_ARM_32
float fv = GET_FLOAT_VAL(payload);
if (isinf(fv) || isnan(fv) || fv > FLT_MAX || fv < -FLT_MAX) {
return -1;
}
#else
if (isinf(*((float *)payload)) || isnan(*((float *)payload)) || *((float *)payload) > FLT_MAX ||
*((float *)payload) < -FLT_MAX) {
return -1;
}
#endif
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
......@@ -765,42 +740,21 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
return -1;
}
#ifdef _TD_ARM_32
SET_DOUBLE_VAL_ALIGN(payload, &value);
#else
*((double *)payload) = value;
#endif
SET_DOUBLE_VAL(payload, value);
}
} else if (pVariant->nType >= TSDB_DATA_TYPE_BOOL && pVariant->nType <= TSDB_DATA_TYPE_BIGINT) {
#ifdef _TD_ARM_32
double dv = (double)(pVariant->i64Key);
SET_DOUBLE_VAL_ALIGN(payload, &dv);
#else
*((double *)payload) = (double)pVariant->i64Key;
#endif
SET_DOUBLE_VAL(payload, pVariant->i64Key);
} else if (pVariant->nType == TSDB_DATA_TYPE_DOUBLE || pVariant->nType == TSDB_DATA_TYPE_FLOAT) {
#ifdef _TD_ARM_32
double dv = (double)(pVariant->dKey);
SET_DOUBLE_VAL_ALIGN(payload, &dv);
#else
*((double *)payload) = pVariant->dKey;
#endif
SET_DOUBLE_VAL(payload, pVariant->dKey);
} else if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
return 0;
}
#ifdef _TD_ARM_32
double dv = GET_DOUBLE_VAL(payload);
if (isinf(dv) || isnan(dv) || dv > DBL_MAX || dv < -DBL_MAX) {
return -1;
}
#else
if (isinf(*((double *)payload)) || isnan(*((double *)payload)) || *((double *)payload) > DBL_MAX ||
*((double *)payload) < -DBL_MAX) {
return -1;
}
#endif
break;
}
......
......@@ -55,6 +55,7 @@
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
......
......@@ -67,14 +67,23 @@ public class DatabaseMetaDataResultSet implements ResultSet {
@Override
public boolean next() throws SQLException {
// boolean ret = false;
// if (rowDataList.size() > 0) {
// ret = rowDataList.iterator().hasNext();
// if (ret) {
// rowCursor = rowDataList.iterator().next();
// cursorRowNumber++;
// }
// }
// return ret;
/**** add by zyyang 2020-09-29 ****************/
boolean ret = false;
if (rowDataList.size() > 0) {
ret = rowDataList.iterator().hasNext();
if (ret) {
rowCursor = rowDataList.iterator().next();
cursorRowNumber++;
}
if (!rowDataList.isEmpty() && cursorRowNumber < rowDataList.size()) {
rowCursor = rowDataList.get(cursorRowNumber++);
ret = true;
}
return ret;
}
......@@ -91,7 +100,8 @@ public class DatabaseMetaDataResultSet implements ResultSet {
@Override
public String getString(int columnIndex) throws SQLException {
columnIndex--;
return rowCursor.getString(columnIndex, columnMetaDataList.get(columnIndex).getColType());
int colType = columnMetaDataList.get(columnIndex).getColType();
return rowCursor.getString(columnIndex, colType);
}
@Override
......
......@@ -586,7 +586,171 @@ public class TSDBDatabaseMetaData implements java.sql.DatabaseMetaData {
public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
throws SQLException {
return getEmptyResultSet();
/** add by zyyang **********/
Statement stmt = null;
if (null != conn && !conn.isClosed()) {
stmt = conn.createStatement();
if (catalog == null || catalog.length() < 1) {
catalog = conn.getCatalog();
}
stmt.executeUpdate("use " + catalog);
DatabaseMetaDataResultSet resultSet = new DatabaseMetaDataResultSet();
// set up ColumnMetaDataList
List<ColumnMetaData> columnMetaDataList = new ArrayList<>(24);
columnMetaDataList.add(null);
columnMetaDataList.add(null);
// add TABLE_NAME
ColumnMetaData colMetaData = new ColumnMetaData();
colMetaData.setColIndex(3);
colMetaData.setColName("TABLE_NAME");
colMetaData.setColSize(193);
colMetaData.setColType(TSDBConstants.TSDB_DATA_TYPE_BINARY);
columnMetaDataList.add(colMetaData);
// add COLUMN_NAME
colMetaData = new ColumnMetaData();
colMetaData.setColIndex(4);
colMetaData.setColName("COLUMN_NAME");
colMetaData.setColSize(65);
colMetaData.setColType(TSDBConstants.TSDB_DATA_TYPE_BINARY);
columnMetaDataList.add(colMetaData);
// add DATA_TYPE
colMetaData = new ColumnMetaData();
colMetaData.setColIndex(5);
colMetaData.setColName("DATA_TYPE");
colMetaData.setColType(TSDBConstants.TSDB_DATA_TYPE_INT);
columnMetaDataList.add(colMetaData);
// add TYPE_NAME
colMetaData = new ColumnMetaData();
colMetaData.setColIndex(6);
colMetaData.setColName("TYPE_NAME");
colMetaData.setColType(TSDBConstants.TSDB_DATA_TYPE_BINARY);
columnMetaDataList.add(colMetaData);
// add COLUMN_SIZE
colMetaData = new ColumnMetaData();
colMetaData.setColIndex(7);
colMetaData.setColName("COLUMN_SIZE");
colMetaData.setColType(TSDBConstants.TSDB_DATA_TYPE_INT);
columnMetaDataList.add(colMetaData);
// add BUFFER_LENGTH ,not used
columnMetaDataList.add(null);
// add DECIMAL_DIGITS
colMetaData = new ColumnMetaData();
colMetaData.setColIndex(9);
colMetaData.setColName("DECIMAL_DIGITS");
colMetaData.setColType(TSDBConstants.TSDB_DATA_TYPE_INT);
columnMetaDataList.add(colMetaData);
// add NUM_PREC_RADIX
colMetaData = new ColumnMetaData();
colMetaData.setColIndex(10);
colMetaData.setColName("NUM_PREC_RADIX");
colMetaData.setColType(TSDBConstants.TSDB_DATA_TYPE_INT);
columnMetaDataList.add(colMetaData);
// add NULLABLE
colMetaData = new ColumnMetaData();
colMetaData.setColIndex(11);
colMetaData.setColName("NULLABLE");
colMetaData.setColType(TSDBConstants.TSDB_DATA_TYPE_INT);
columnMetaDataList.add(colMetaData);
resultSet.setColumnMetaDataList(columnMetaDataList);
// set up rowDataList
ResultSet resultSet0 = stmt.executeQuery("describe " + tableNamePattern);
List<TSDBResultSetRowData> rowDataList = new ArrayList<>();
int index = 0;
while (resultSet0.next()) {
TSDBResultSetRowData rowData = new TSDBResultSetRowData(24);
// set TABLE_NAME
rowData.setString(2, tableNamePattern);
// set COLUMN_NAME
rowData.setString(3, resultSet0.getString(1));
// set DATA_TYPE
String typeName = resultSet0.getString(2);
rowData.setInt(4, getDataType(typeName));
// set TYPE_NAME
rowData.setString(5, typeName);
// set COLUMN_SIZE
int length = resultSet0.getInt(3);
rowData.setInt(6, getColumnSize(typeName, length));
// set DECIMAL_DIGITS
rowData.setInt(8, getDecimalDigits(typeName));
// set NUM_PREC_RADIX
rowData.setInt(9, 10);
// set NULLABLE
rowData.setInt(10, getNullable(index, typeName));
rowDataList.add(rowData);
index++;
}
resultSet.setRowDataList(rowDataList);
// GetColumnsResultSet getColumnsResultSet = new GetColumnsResultSet(resultSet0, catalog, schemaPattern, tableNamePattern, columnNamePattern);
// return getColumnsResultSet;
// DatabaseMetaDataResultSet resultSet = new DatabaseMetaDataResultSet();
return resultSet;
} else {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
/*************************/
// return getEmptyResultSet();
}
private int getNullable(int index, String typeName) {
if (index == 0 && "TIMESTAMP".equals(typeName))
return DatabaseMetaData.columnNoNulls;
return DatabaseMetaData.columnNullable;
}
private int getColumnSize(String typeName, int length) {
switch (typeName) {
case "TIMESTAMP":
return 23;
default:
return 0;
}
}
private int getDecimalDigits(String typeName) {
switch (typeName) {
case "FLOAT":
return 5;
case "DOUBLE":
return 9;
default:
return 0;
}
}
private int getDataType(String typeName) {
switch (typeName) {
case "TIMESTAMP":
return Types.TIMESTAMP;
case "INT":
return Types.INTEGER;
case "BIGINT":
return Types.BIGINT;
case "FLOAT":
return Types.FLOAT;
case "DOUBLE":
return Types.DOUBLE;
case "BINARY":
return Types.BINARY;
case "SMALLINT":
return Types.SMALLINT;
case "TINYINT":
return Types.TINYINT;
case "BOOL":
return Types.BOOLEAN;
case "NCHAR":
return Types.NCHAR;
default:
return Types.NULL;
}
}
public ResultSet getColumnPrivileges(String catalog, String schema, String table, String columnNamePattern)
......
......@@ -464,7 +464,14 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
for (int i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i])
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
if (!mnodeIsRunning()) {
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
dInfo("mnode index:%d %s:%u should work as master", i, pEpSet->fqdn[i], pEpSet->port[i]);
sdbUpdateSync();
}
}
}
tsDMnodeEpSet = *pEpSet;
......
......@@ -64,7 +64,7 @@ extern const int32_t TYPE_BYTES[11];
// TODO: replace and remove code below
#define CHAR_BYTES sizeof(char)
#define SHORT_BYTES sizeof(int16_t)
#define INT_BYTES sizeof(int)
#define INT_BYTES sizeof(int32_t)
#define LONG_BYTES sizeof(int64_t)
#define FLOAT_BYTES sizeof(float)
#define DOUBLE_BYTES sizeof(double)
......@@ -73,7 +73,7 @@ extern const int32_t TYPE_BYTES[11];
#define TSDB_DATA_BOOL_NULL 0x02
#define TSDB_DATA_TINYINT_NULL 0x80
#define TSDB_DATA_SMALLINT_NULL 0x8000
#define TSDB_DATA_INT_NULL 0x80000000
#define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
......@@ -132,21 +132,30 @@ do { \
#define GET_INT32_VAL(x) (*(int32_t *)(x))
#define GET_INT64_VAL(x) (*(int64_t *)(x))
#ifdef _TD_ARM_32
#define GET_FLOAT_VAL(x) taos_align_get_float(x)
#define GET_DOUBLE_VAL(x) taos_align_get_double(x)
float taos_align_get_float(const char* pBuf);
double taos_align_get_double(const char* pBuf);
//#define __float_align_declear() float __underlyFloat = 0.0;
//#define __float_align_declear()
//#define GET_FLOAT_VAL_ALIGN(x) (*(int32_t*)&(__underlyFloat) = *(int32_t*)(x); __underlyFloat);
// notes: src must be float or double type variable !!!
#define SET_FLOAT_VAL_ALIGN(dst, src) (*(int32_t*) dst = *(int32_t*)src);
#define SET_DOUBLE_VAL_ALIGN(dst, src) (*(int64_t*) dst = *(int64_t*)src);
//#define SET_FLOAT_VAL_ALIGN(dst, src) (*(int32_t*) dst = *(int32_t*)src);
//#define SET_DOUBLE_VAL_ALIGN(dst, src) (*(int64_t*) dst = *(int64_t*)src);
float taos_align_get_float(const char* pBuf);
double taos_align_get_double(const char* pBuf);
#define GET_FLOAT_VAL(x) taos_align_get_float(x)
#define GET_DOUBLE_VAL(x) taos_align_get_double(x)
#define SET_FLOAT_VAL(x, y) { float z = (float)(y); (*(int32_t*) x = *(int32_t*)(&z)); }
#define SET_DOUBLE_VAL(x, y) { double z = (double)(y); (*(int64_t*) x = *(int64_t*)(&z)); }
#define SET_FLOAT_PTR(x, y) { (*(int32_t*) x = *(int32_t*)y); }
#define SET_DOUBLE_PTR(x, y) { (*(int64_t*) x = *(int64_t*)y); }
#else
#define GET_FLOAT_VAL(x) (*(float *)(x))
#define GET_DOUBLE_VAL(x) (*(double *)(x))
#define SET_FLOAT_VAL(x, y) { (*(float *)(x)) = (float)(y); }
#define SET_DOUBLE_VAL(x, y) { (*(double *)(x)) = (double)(y); }
#define SET_FLOAT_PTR(x, y) { (*(float *)(x)) = (*(float *)(y)); }
#define SET_DOUBLE_PTR(x, y) { (*(double *)(x)) = (*(double *)(y)); }
#endif
typedef struct tDataTypeDescriptor {
......@@ -295,7 +304,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MIN_VNODES 64
#define TSDB_MAX_VNODES 2048
#define TSDB_MIN_VNODES_PER_DB 2
#define TSDB_MAX_VNODES_PER_DB 16
#define TSDB_MAX_VNODES_PER_DB 64
#define TSDB_DNODE_ROLE_ANY 0
#define TSDB_DNODE_ROLE_MGMT 1
......
......@@ -248,6 +248,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted")
// http
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin")
......
......@@ -419,16 +419,16 @@ static void dumpFieldToFile(FILE* fp, const char* val, TAOS_FIELD* field, int32_
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
fprintf(fp, "%d", ((((int)(*((char *)val))) == 1) ? 1 : 0));
fprintf(fp, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
fprintf(fp, "%d", (int)(*((char *)val)));
fprintf(fp, "%d", *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
fprintf(fp, "%d", (int)(*((short *)val)));
fprintf(fp, "%d", *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
fprintf(fp, "%d", *((int *)val));
fprintf(fp, "%d", *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
fprintf(fp, "%" PRId64, *((int64_t *)val));
......@@ -559,16 +559,16 @@ static void printField(const char* val, TAOS_FIELD* field, int width, int32_t le
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
printf("%*s", width, ((((int)(*((char *)val))) == 1) ? "true" : "false"));
printf("%*s", width, ((((int32_t)(*((char *)val))) == 1) ? "true" : "false"));
break;
case TSDB_DATA_TYPE_TINYINT:
printf("%*d", width, (int)(*((char *)val)));
printf("%*d", width, *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
printf("%*d", width, (int)(*((short *)val)));
printf("%*d", width, *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
printf("%*d", width, *((int *)val));
printf("%*d", width, *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
printf("%*" PRId64, width, *((int64_t *)val));
......
......@@ -185,7 +185,11 @@ static int32_t sdbInitWal() {
}
sdbInfo("open sdb wal for restore");
walRestore(tsSdbObj.wal, NULL, sdbWrite);
int code = walRestore(tsSdbObj.wal, NULL, sdbWrite);
if (code != TSDB_CODE_SUCCESS) {
sdbError("failed to open wal for restore, reason:%s", tstrerror(code));
return -1;
}
return 0;
}
......
......@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, true, mnodeFreeShowObj, "show");
tsMnodeShowCache = taosCacheInit(TSDB_CACHE_PTR_KEY, 5, true, mnodeFreeShowObj, "show");
return 0;
}
......@@ -378,8 +378,8 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
}
static bool mnodeAccquireShowObj(SShowObj *pShow) {
uint64_t handleVal = (uint64_t)pShow;
SShowObj **ppShow = taosCacheAcquireByKey(tsMnodeShowCache, &handleVal, sizeof(int64_t));
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
SShowObj **ppShow = taosCacheAcquireByKey(tsMnodeShowCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
if (ppShow) {
mDebug("%p, show is accquired from cache, data:%p, index:%d", pShow, ppShow, pShow->index);
return true;
......@@ -393,8 +393,8 @@ static void* mnodePutShowObj(SShowObj *pShow) {
if (tsMnodeShowCache != NULL) {
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
uint64_t handleVal = (uint64_t)pShow;
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(int64_t), &pShow, sizeof(int64_t), DEFAULT_SHOWHANDLE_LIFE_SPAN);
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_SHOWHANDLE_LIFE_SPAN);
pShow->ppShow = (void**)ppShow;
mDebug("%p, show is put into cache, data:%p index:%d", pShow, ppShow, pShow->index);
return pShow;
......
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#include <sys/types.h>
// TAOS_OS_FUNC_DIR
void taosRemoveDir(char *rootDir);
int taosMkDir(const char *pathname, mode_t mode);
......
......@@ -148,10 +148,10 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%" PRId64, fields[i].name, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.5f", fields[i].name, *((float *)row[i]));
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.5f", fields[i].name, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.9f", fields[i].name, *((double *)row[i]));
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.9f", fields[i].name, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......@@ -210,10 +210,10 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
httpJsonFloat(jsonBuf, *((float *)row[i]));
httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
httpJsonDouble(jsonBuf, *((double *)row[i]));
httpJsonDouble(jsonBuf, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......
......@@ -124,10 +124,10 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
httpJsonFloat(jsonBuf, *((float *)row[i]));
httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
httpJsonDouble(jsonBuf, *((double *)row[i]));
httpJsonDouble(jsonBuf, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......
......@@ -7046,7 +7046,7 @@ void* qOpenQueryMgmt(int32_t vgId) {
return NULL;
}
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_CACHE_PTR_KEY, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryMgmt->closed = false;
pQueryMgmt->vgId = vgId;
......@@ -7115,23 +7115,23 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
return NULL;
} else {
uint64_t handleVal = (uint64_t) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(int64_t), &qInfo, POINTER_BYTES, DEFAULT_QHANDLE_LIFE_SPAN);
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_QHANDLE_LIFE_SPAN);
// pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
}
}
void** qAcquireQInfo(void* pMgmt, uint64_t key) {
void** qAcquireQInfo(void* pMgmt, uint64_t _key) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
return NULL;
}
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(uint64_t));
TSDB_CACHE_PTR_TYPE key = (TSDB_CACHE_PTR_TYPE)_key;
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(TSDB_CACHE_PTR_TYPE));
if (handle == NULL || *handle == NULL) {
return NULL;
} else {
......
......@@ -216,6 +216,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) {
if (pCache == NULL || pCache->maxSessions == 0) return;
if (pCache->pTimer != tmrId) return;
pthread_mutex_lock(&pCache->mutex);
uint64_t time = taosGetTimestampMs();
for (hash = 0; hash < pCache->maxSessions; ++hash) {
......@@ -227,6 +228,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) {
// tTrace("timer, total connections in cache:%d", pCache->total);
taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
pthread_mutex_unlock(&pCache->mutex);
}
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
......
......@@ -491,7 +491,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL;
SSyncPeer *pPeer = (SSyncPeer *)calloc(1, sizeof(SSyncPeer));
SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL;
pPeer->nodeId = pInfo->nodeId;
......@@ -499,7 +499,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer->ip = ip;
pPeer->port = pInfo->nodePort;
pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%d", pNode->vgId, pPeer->fqdn, pPeer->port);
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port);
pPeer->peerFd = -1;
pPeer->syncFd = -1;
......
......@@ -109,7 +109,6 @@ int processRpcMsg(void *item) {
if (pCfg->quorum <= 1) {
rpcFreeCont(pMsg->pCont);
taosFreeQitem(item);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(msgSize);
......@@ -117,6 +116,7 @@ int processRpcMsg(void *item) {
rpcMsg.handle = pMsg->handle;
rpcMsg.code = code;
rpcSendResponse(&rpcMsg);
taosFreeQitem(item);
}
return code;
......
......@@ -26,7 +26,7 @@ extern "C" {
#define COMP_OVERFLOW_BYTES 2
#define BITS_PER_BYTE 8
// Masks
#define INT64MASK(_x) ((1ul << _x) - 1)
#define INT64MASK(_x) ((((uint64_t)1) << _x) - 1)
#define INT32MASK(_x) (((uint32_t)1 << _x) - 1)
#define INT8MASK(_x) (((uint8_t)1 << _x) - 1)
// Compression algorithm
......
......@@ -385,9 +385,10 @@ static void walRelease(SWal *pWal) {
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
char *name = pWal->name;
int size = 1024 * 1024; // default 1M buffer size
terrno = 0;
char *buffer = malloc(1024000); // size for one record
char *buffer = malloc(size);
if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
......@@ -395,7 +396,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
SWalHead *pHead = (SWalHead *)buffer;
int fd = open(name, O_RDONLY);
int fd = open(name, O_RDWR);
if (fd < 0) {
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -405,29 +406,58 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
wDebug("wal:%s, start to restore", name);
size_t offset = 0;
while (1) {
int ret = taosTRead(fd, pHead, sizeof(SWalHead));
if ( ret == 0) break;
if (ret == 0) break;
if (ret != sizeof(SWalHead)) {
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
if (ret < 0) {
wError("wal:%s, failed to read wal head part since %s", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < sizeof(SWalHead)) {
wError("wal:%s, failed to read head, ret:%d, skip the rest of file", name, ret);
taosFtruncate(fd, offset);
fsync(fd);
break;
}
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(false);
break;
}
if (pHead->len > size - sizeof(SWalHead)) {
size = sizeof(SWalHead) + pHead->len;
buffer = realloc(buffer, size);
if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
pHead = (SWalHead *)buffer;
}
ret = taosTRead(fd, pHead->cont, pHead->len);
if ( ret != pHead->len) {
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
if (ret < 0) {
wError("wal:%s failed to read wal body part since %s", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", name, pHead->len, ret);
taosFtruncate(fd, offset);
fsync(fd);
break;
}
offset = offset + sizeof(SWalHead) + pHead->len;
if (pWal->keep) pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
}
......
......@@ -23,35 +23,39 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1538548685000
self.ts = 1593548685000
def run(self):
tdSql.prepare()
tdSql.execute("create table st (ts timestamp, voltage int) tags (loc nchar(30))")
tdSql.execute("insert into t0 using st tags('beijing') values(now, 220) (now - 15d, 221) (now - 30d, 225) (now - 35d, 228) (now - 45d, 222)")
tdSql.execute("insert into t1 using st tags('shanghai') values(now, 220) (now - 60d, 221) (now - 50d, 225) (now - 40d, 228) (now - 20d, 222)")
tdSql.execute("insert into t0 using st tags('beijing') values(%d, 220) (%d, 221) (%d, 225) (%d, 228) (%d, 222)"
% (self.ts, self.ts + 1000000000, self.ts + 2000000000, self.ts + 3000000000, self.ts + 6000000000))
tdSql.execute("insert into t1 using st tags('shanghai') values(%d, 220) (%d, 221) (%d, 225) (%d, 228) (%d, 222)"
% (self.ts, self.ts + 2000000000, self.ts + 4000000000, self.ts + 5000000000, self.ts + 7000000000))
tdSql.query("select avg(voltage) from st interval(1n)")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 223.0)
tdSql.checkData(1, 1, 225.0)
tdSql.checkData(2, 1, 220.333333)
tdSql.checkData(0, 1, 221.4)
tdSql.checkData(1, 1, 227.0)
tdSql.checkData(2, 1, 222.0)
tdSql.query("select avg(voltage) from st interval(1n, 15d)")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 224.8)
tdSql.checkData(1, 1, 222.666666)
tdSql.checkData(2, 1, 220.0)
tdSql.checkRows(4)
tdSql.checkData(0, 1, 220.333333)
tdSql.checkData(1, 1, 224.666666)
tdSql.checkData(2, 1, 225.0)
tdSql.checkData(3, 1, 222.0)
tdSql.query("select avg(voltage) from st interval(1n, 15d) group by loc")
tdSql.checkRows(6)
tdSql.checkData(0, 1, 225.0)
tdSql.checkData(1, 1, 223.0)
tdSql.checkData(2, 1, 220.0)
tdSql.checkData(3, 1, 224.666666)
tdSql.checkData(4, 1, 222.0)
tdSql.checkData(5, 1, 220.0)
tdSql.checkRows(7)
tdSql.checkData(0, 1, 220.5)
tdSql.checkData(1, 1, 226.5)
tdSql.checkData(2, 1, 222.0)
tdSql.checkData(3, 1, 220.0)
tdSql.checkData(4, 1, 221.0)
tdSql.checkData(5, 1, 226.5)
tdSql.checkData(6, 1, 222.0)
def stop(self):
tdSql.close()
......
......@@ -14,12 +14,14 @@ $i = 0
$db = $dbPrefix . $i
$stb = $stbPrefix . $i
print use $db
sql use $db
##### select first/last from table
## TBASE-331
print ====== select first/last from table
$tb = $tbPrefix . 0
print select first(*) from $tb
sql select first(*) from $tb
if $rows != 1 then
return -1
......@@ -58,6 +60,7 @@ if $data09 != NCHAR then
return -1
endi
print select last(*) from $tb
sql select last(*) from $tb
if $rows != 1 then
return -1
......
......@@ -94,7 +94,11 @@ endi
## select specified columns
print select c1 from $mt
sql select c1 from $mt
print rows $rows
print totalNum $totalNum
if $rows != $totalNum then
return -1
endi
......
......@@ -303,8 +303,8 @@ cd ../../../debug; make
./test.sh -f unique/mnode/mgmt22.sim
./test.sh -f unique/mnode/mgmt23.sim
./test.sh -f unique/mnode/mgmt24.sim
#./test.sh -f unique/mnode/mgmt25.sim
#./test.sh -f unique/mnode/mgmt26.sim
./test.sh -f unique/mnode/mgmt25.sim
./test.sh -f unique/mnode/mgmt26.sim
./test.sh -f unique/mnode/mgmt33.sim
./test.sh -f unique/mnode/mgmt34.sim
./test.sh -f unique/mnode/mgmtr2.sim
......
......@@ -65,7 +65,7 @@ endi
print ============== step4
sql drop dnode $hostname2
sleep 8000
sleep 16000
sql show mnodes
$dnode1Role = $data2_1
......
......@@ -739,36 +739,22 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
((((int)(*((char *)row[i]))) == 1) ? "1" : "0"));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(value, "%d", (int)(*((char *)row[i])));
sprintf(value, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(value, "%d", (int)(*((short *)row[i])));
sprintf(value, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
sprintf(value, "%d", *((int *)row[i]));
sprintf(value, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(value, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:{
#ifdef _TD_ARM_32
float fv = 0;
*(int32_t*)(&fv) = *(int32_t*)row[i];
sprintf(value, "%.5f", fv);
#else
sprintf(value, "%.5f", *((float *)row[i]));
#endif
}
case TSDB_DATA_TYPE_FLOAT:
sprintf(value, "%.5f", GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE: {
#ifdef _TD_ARM_32
double dv = 0;
*(int64_t*)(&dv) = *(int64_t*)row[i];
sprintf(value, "%.9lf", dv);
#else
sprintf(value, "%.9lf", *((double *)row[i]));
#endif
}
case TSDB_DATA_TYPE_DOUBLE:
sprintf(value, "%.9lf", GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册