未验证 提交 2efda127 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge branch 'develop' into hotfix/td-5931

...@@ -5,7 +5,7 @@ node { ...@@ -5,7 +5,7 @@ node {
git url: 'https://github.com/taosdata/TDengine.git' git url: 'https://github.com/taosdata/TDengine.git'
} }
def skipstage=0 def skipbuild=0
def abortPreviousBuilds() { def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME def currentJobName = env.JOB_NAME
...@@ -33,8 +33,7 @@ def abort_previous(){ ...@@ -33,8 +33,7 @@ def abort_previous(){
milestone(buildNumber) milestone(buildNumber)
} }
def pre_test(){ def pre_test(){
sh'hostname'
sh ''' sh '''
sudo rmtaos || echo "taosd has not installed" sudo rmtaos || echo "taosd has not installed"
''' '''
...@@ -52,12 +51,18 @@ def pre_test(){ ...@@ -52,12 +51,18 @@ def pre_test(){
git checkout master git checkout master
''' '''
} }
else { else if(env.CHANGE_TARGET == '2.0'){
sh ''' sh '''
cd ${WKC} cd ${WKC}
git checkout develop git checkout 2.0
''' '''
} }
else{
sh '''
cd ${WKC}
git checkout develop
'''
}
} }
sh''' sh'''
cd ${WKC} cd ${WKC}
...@@ -75,7 +80,13 @@ def pre_test(){ ...@@ -75,7 +80,13 @@ def pre_test(){
git checkout master git checkout master
''' '''
} }
else { else if(env.CHANGE_TARGET == '2.0'){
sh '''
cd ${WK}
git checkout 2.0
'''
}
else{
sh ''' sh '''
cd ${WK} cd ${WK}
git checkout develop git checkout develop
...@@ -95,19 +106,17 @@ def pre_test(){ ...@@ -95,19 +106,17 @@ def pre_test(){
make > /dev/null make > /dev/null
make install > /dev/null make install > /dev/null
cd ${WKC}/tests cd ${WKC}/tests
pip3 install ${WKC}/src/connector/python pip3 install ${WKC}/src/connector/python/
''' '''
return 1 return 1
} }
pipeline { pipeline {
agent none agent none
environment{ environment{
WK = '/var/lib/jenkins/workspace/TDinternal' WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community' WKC= '/var/lib/jenkins/workspace/TDinternal/community'
} }
stages { stages {
stage('pre_build'){ stage('pre_build'){
agent{label 'master'} agent{label 'master'}
...@@ -123,19 +132,22 @@ pipeline { ...@@ -123,19 +132,22 @@ pipeline {
rm -rf ${WORKSPACE}.tes rm -rf ${WORKSPACE}.tes
cp -r ${WORKSPACE} ${WORKSPACE}.tes cp -r ${WORKSPACE} ${WORKSPACE}.tes
cd ${WORKSPACE}.tes cd ${WORKSPACE}.tes
git fetch
''' '''
script { script {
if (env.CHANGE_TARGET == 'master') { if (env.CHANGE_TARGET == 'master') {
sh ''' sh '''
git checkout master git checkout master
git pull origin master
''' '''
} }
else { else if(env.CHANGE_TARGET == '2.0'){
sh '''
git checkout 2.0
'''
}
else{
sh ''' sh '''
git checkout develop git checkout develop
git pull origin develop
''' '''
} }
} }
...@@ -143,32 +155,34 @@ pipeline { ...@@ -143,32 +155,34 @@ pipeline {
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
''' '''
script{ script{
env.skipstage=sh(script:"cd ${WORKSPACE}.tes && git --no-pager diff --name-only FETCH_HEAD ${env.CHANGE_TARGET}|grep -v -E '.*md|//src//connector|Jenkinsfile|test-all.sh' || echo 0 ",returnStdout:true) skipbuild='2'
skipbuild=sh(script: "git log -2 --pretty=%B | fgrep -ie '[skip ci]' -e '[ci skip]' && echo 1 || echo 2", returnStdout:true)
println skipbuild
} }
println env.skipstage
sh''' sh'''
rm -rf ${WORKSPACE}.tes rm -rf ${WORKSPACE}.tes
''' '''
} }
} }
stage('Parallel test stage') { stage('Parallel test stage') {
//only build pr //only build pr
when { when {
allOf{
changeRequest() changeRequest()
expression { expression{
env.skipstage != 0 return skipbuild.trim() == '2'
} }
}
} }
parallel { parallel {
stage('python_1_s1') { stage('python_1_s1') {
agent{label 'p1'} agent{label " slave1 || slave11 "}
steps { steps {
pre_test() pre_test()
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
sh ''' sh '''
date date
cd ${WKC}/tests cd ${WKC}/tests
...@@ -179,11 +193,11 @@ pipeline { ...@@ -179,11 +193,11 @@ pipeline {
} }
} }
stage('python_2_s5') { stage('python_2_s5') {
agent{label 'p2'} agent{label " slave5 || slave15 "}
steps { steps {
pre_test() pre_test()
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
sh ''' sh '''
date date
cd ${WKC}/tests cd ${WKC}/tests
...@@ -193,9 +207,9 @@ pipeline { ...@@ -193,9 +207,9 @@ pipeline {
} }
} }
stage('python_3_s6') { stage('python_3_s6') {
agent{label 'p3'} agent{label " slave6 || slave16 "}
steps { steps {
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
pre_test() pre_test()
sh ''' sh '''
date date
...@@ -206,9 +220,9 @@ pipeline { ...@@ -206,9 +220,9 @@ pipeline {
} }
} }
stage('test_b1_s2') { stage('test_b1_s2') {
agent{label 'b1'} agent{label " slave2 || slave12 "}
steps { steps {
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
pre_test() pre_test()
sh ''' sh '''
cd ${WKC}/tests cd ${WKC}/tests
...@@ -217,9 +231,8 @@ pipeline { ...@@ -217,9 +231,8 @@ pipeline {
} }
} }
} }
stage('test_crash_gen_s3') { stage('test_crash_gen_s3') {
agent{label "b2"} agent{label " slave3 || slave13 "}
steps { steps {
pre_test() pre_test()
...@@ -245,20 +258,18 @@ pipeline { ...@@ -245,20 +258,18 @@ pipeline {
./handle_taosd_val_log.sh ./handle_taosd_val_log.sh
''' '''
} }
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
sh ''' sh '''
date date
cd ${WKC}/tests cd ${WKC}/tests
./test-all.sh b2fq ./test-all.sh b2fq
date date
''' '''
} }
} }
} }
stage('test_valgrind_s4') { stage('test_valgrind_s4') {
agent{label "b3"} agent{label " slave4 || slave14 "}
steps { steps {
pre_test() pre_test()
...@@ -269,7 +280,7 @@ pipeline { ...@@ -269,7 +280,7 @@ pipeline {
./handle_val_log.sh ./handle_val_log.sh
''' '''
} }
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
sh ''' sh '''
date date
cd ${WKC}/tests cd ${WKC}/tests
...@@ -284,9 +295,9 @@ pipeline { ...@@ -284,9 +295,9 @@ pipeline {
} }
} }
stage('test_b4_s7') { stage('test_b4_s7') {
agent{label 'b4'} agent{label " slave7 || slave17 "}
steps { steps {
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
pre_test() pre_test()
sh ''' sh '''
date date
...@@ -303,9 +314,9 @@ pipeline { ...@@ -303,9 +314,9 @@ pipeline {
} }
} }
stage('test_b5_s8') { stage('test_b5_s8') {
agent{label 'b5'} agent{label " slave8 || slave18 "}
steps { steps {
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
pre_test() pre_test()
sh ''' sh '''
date date
...@@ -316,9 +327,9 @@ pipeline { ...@@ -316,9 +327,9 @@ pipeline {
} }
} }
stage('test_b6_s9') { stage('test_b6_s9') {
agent{label 'b6'} agent{label " slave9 || slave19 "}
steps { steps {
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
pre_test() pre_test()
sh ''' sh '''
date date
...@@ -329,9 +340,9 @@ pipeline { ...@@ -329,9 +340,9 @@ pipeline {
} }
} }
stage('test_b7_s10') { stage('test_b7_s10') {
agent{label 'b7'} agent{label " slave10 || slave20 "}
steps { steps {
timeout(time: 45, unit: 'MINUTES'){ timeout(time: 55, unit: 'MINUTES'){
pre_test() pre_test()
sh ''' sh '''
date date
...@@ -421,6 +432,5 @@ pipeline { ...@@ -421,6 +432,5 @@ pipeline {
from: "support@taosdata.com" from: "support@taosdata.com"
) )
} }
} }
}
} \ No newline at end of file
# TDengine 集群安装、管理 # TDengine 集群安装、管理
多个TDengine服务器,也就是多个taosd的运行实例可以组成一个集群,以保证TDengine的高可靠运行,并提供水平扩展能力。要了解TDengine 2.0的集群管理,需要对集群的基本概念有所了解,请看TDengine 2.0整体架构一章。而且在安装集群之前,先请按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章安装并体验单节点功能。 多个TDengine服务器,也就是多个taosd的运行实例可以组成一个集群,以保证TDengine的高可靠运行,并提供水平扩展能力。要了解TDengine 2.0的集群管理,需要对集群的基本概念有所了解,请看《TDengine整体架构》一章。而且在安装集群之前,建议先按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章安装并体验单节点功能。
集群的每个数据节点是由End Point来唯一标识的,End Point是由FQDN(Fully Qualified Domain Name)外加Port组成,比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname,可通过Linux命令`hostname -f`获取(如何配置FQDN,请参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html))。端口是这个数据节点对外服务的端口号,缺省是6030,但可以通过taos.cfg里配置参数serverPort进行修改。一个物理节点可能配置了多个hostname, TDengine会自动获取第一个,但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问,可以将参数fqdn设置为本节点的IP地址。 集群的每个数据节点是由End Point来唯一标识的,End Point是由FQDN(Fully Qualified Domain Name)外加Port组成,比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname,可通过Linux命令`hostname -f`获取(如何配置FQDN,请参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html))。端口是这个数据节点对外服务的端口号,缺省是6030,但可以通过taos.cfg里配置参数serverPort进行修改。一个物理节点可能配置了多个hostname, TDengine会自动获取第一个,但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问,可以将参数fqdn设置为本节点的IP地址。
...@@ -12,7 +12,7 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预 ...@@ -12,7 +12,7 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预
**第零步**:规划集群所有物理节点的FQDN,将规划好的FQDN分别添加到每个物理节点的/etc/hostname;修改每个物理节点的/etc/hosts,将所有集群物理节点的IP与FQDN的对应添加好。【如部署了DNS,请联系网络管理员在DNS上做好相关配置】 **第零步**:规划集群所有物理节点的FQDN,将规划好的FQDN分别添加到每个物理节点的/etc/hostname;修改每个物理节点的/etc/hosts,将所有集群物理节点的IP与FQDN的对应添加好。【如部署了DNS,请联系网络管理员在DNS上做好相关配置】
**第一步**:如果搭建集群的物理节点中,存有之前的测试数据、装过1.X的版本,或者装过其他版本的TDengine,请先将其删除,并清空所有数据,具体步骤请参考博客[《TDengine多种安装包的安装和卸载》](https://www.taosdata.com/blog/2019/08/09/566.html ) **第一步**:如果搭建集群的物理节点中,存有之前的测试数据、装过1.X的版本,或者装过其他版本的TDengine,请先将其删除,并清空所有数据(如果需要保留原有数据,请联系涛思交付团队进行旧版本升级、数据迁移),具体步骤请参考博客[《TDengine多种安装包的安装和卸载》](https://www.taosdata.com/blog/2019/08/09/566.html )
**注意1:**因为FQDN的信息会写进文件,如果之前没有配置或者更改FQDN,且启动了TDengine。请一定在确保数据无用或者备份的前提下,清理一下之前的数据(`rm -rf /var/lib/taos/*`); **注意1:**因为FQDN的信息会写进文件,如果之前没有配置或者更改FQDN,且启动了TDengine。请一定在确保数据无用或者备份的前提下,清理一下之前的数据(`rm -rf /var/lib/taos/*`);
**注意2:**客户端也需要配置,确保它可以正确解析每个节点的FQDN配置,不管是通过DNS服务,还是 Host 文件。 **注意2:**客户端也需要配置,确保它可以正确解析每个节点的FQDN配置,不管是通过DNS服务,还是 Host 文件。
...@@ -23,23 +23,23 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预 ...@@ -23,23 +23,23 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预
**第四步**:检查所有数据节点,以及应用程序所在物理节点的网络设置: **第四步**:检查所有数据节点,以及应用程序所在物理节点的网络设置:
1. 每个物理节点上执行命令`hostname -f`,查看和确认所有节点的hostname是不相同的(应用驱动所在节点无需做此项检查); 1. 每个物理节点上执行命令`hostname -f`,查看和确认所有节点的hostname是不相同的(应用驱动所在节点无需做此项检查);
2. 每个物理节点上执行`ping host`, 其中host是其他物理节点的hostname, 看能否ping通其它物理节点; 如果不能ping通,需要检查网络设置, 或/etc/hosts文件(Windows系统默认路径为C:\Windows\system32\drivers\etc\hosts),或DNS的配置。如果无法ping通,是无法组成集群的; 2. 每个物理节点上执行`ping host`,其中host是其他物理节点的hostname,看能否ping通其它物理节点;如果不能ping通,需要检查网络设置,或/etc/hosts文件(Windows系统默认路径为C:\Windows\system32\drivers\etc\hosts),或DNS的配置。如果无法ping通,是无法组成集群的;
3. 从应用运行的物理节点,ping taosd运行的数据节点,如果无法ping通,应用是无法连接taosd的,请检查应用所在物理节点的DNS设置或hosts文件; 3. 从应用运行的物理节点,ping taosd运行的数据节点,如果无法ping通,应用是无法连接taosd的,请检查应用所在物理节点的DNS设置或hosts文件;
4. 每个数据节点的End Point就是输出的hostname外加端口号,比如h1.taosdata.com:6030 4. 每个数据节点的End Point就是输出的hostname外加端口号,比如h1.taosdata.com:6030
**第五步**:修改TDengine的配置文件(所有节点的文件/etc/taos/taos.cfg都需要修改)。假设准备启动的第一个数据节点End Point为 h1.taosdata.com:6030, 其与集群配置相关参数如下: **第五步**:修改TDengine的配置文件(所有节点的文件/etc/taos/taos.cfg都需要修改)。假设准备启动的第一个数据节点End Point为 h1.taosdata.com:6030其与集群配置相关参数如下:
``` ```
// firstEp 是每个数据节点首次启动后连接的第一个数据节点 // firstEp 是每个数据节点首次启动后连接的第一个数据节点
firstEp h1.taosdata.com:6030 firstEp h1.taosdata.com:6030
// 必须配置为本数据节点的FQDN,如果本机只有一个hostname, 可注释掉本配置 // 必须配置为本数据节点的FQDN,如果本机只有一个hostname, 可注释掉本
fqdn h1.taosdata.com fqdn h1.taosdata.com
// 配置本数据节点的端口号,缺省是6030 // 配置本数据节点的端口号,缺省是6030
serverPort 6030 serverPort 6030
// 使用场景,请参考《Arbitrator的使用》的部分 // 副本数为偶数的时候,需要配置,请参考《Arbitrator的使用》的部分
arbitrator ha.taosdata.com:6042 arbitrator ha.taosdata.com:6042
``` ```
...@@ -53,7 +53,7 @@ arbitrator ha.taosdata.com:6042 ...@@ -53,7 +53,7 @@ arbitrator ha.taosdata.com:6042
| 2 | mnodeEqualVnodeNum | 一个mnode等同于vnode消耗的个数 | | 2 | mnodeEqualVnodeNum | 一个mnode等同于vnode消耗的个数 |
| 3 | offlineThreshold | dnode离线阈值,超过该时间将导致Dnode离线 | | 3 | offlineThreshold | dnode离线阈值,超过该时间将导致Dnode离线 |
| 4 | statusInterval | dnode向mnode报告状态时长 | | 4 | statusInterval | dnode向mnode报告状态时长 |
| 5 | arbitrator | 系统中裁决器的end point | | 5 | arbitrator | 系统中裁决器的End Point |
| 6 | timezone | 时区 | | 6 | timezone | 时区 |
| 7 | balance | 是否启动负载均衡 | | 7 | balance | 是否启动负载均衡 |
| 8 | maxTablesPerVnode | 每个vnode中能够创建的最大表个数 | | 8 | maxTablesPerVnode | 每个vnode中能够创建的最大表个数 |
...@@ -87,7 +87,7 @@ taos> ...@@ -87,7 +87,7 @@ taos>
1. 按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章的方法在每个物理节点启动taosd;(注意:每个物理节点都需要在 taos.cfg 文件中将 firstEP 参数配置为新集群首个节点的 End Point——在本例中是 h1.taos.com:6030) 1. 按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章的方法在每个物理节点启动taosd;(注意:每个物理节点都需要在 taos.cfg 文件中将 firstEP 参数配置为新集群首个节点的 End Point——在本例中是 h1.taos.com:6030)
2. 在第一个数据节点,使用CLI程序taos, 登录进TDengine系统, 执行命令: 2. 在第一个数据节点,使用CLI程序taos,登录进TDengine系统,执行命令:
``` ```
CREATE DNODE "h2.taos.com:6030"; CREATE DNODE "h2.taos.com:6030";
...@@ -101,7 +101,7 @@ taos> ...@@ -101,7 +101,7 @@ taos>
SHOW DNODES; SHOW DNODES;
``` ```
查看新节点是否被成功加入。如果该被加入的数据节点处于离线状态,请做两个检查 查看新节点是否被成功加入。如果该被加入的数据节点处于离线状态,请做两个检查
- 查看该数据节点的taosd是否正常工作,如果没有正常运行,需要先检查为什么 - 查看该数据节点的taosd是否正常工作,如果没有正常运行,需要先检查为什么
- 查看该数据节点taosd日志文件taosdlog.0里前面几行日志(一般在/var/log/taos目录),看日志里输出的该数据节点fqdn以及端口号是否为刚添加的End Point。如果不一致,需要将正确的End Point添加进去。 - 查看该数据节点taosd日志文件taosdlog.0里前面几行日志(一般在/var/log/taos目录),看日志里输出的该数据节点fqdn以及端口号是否为刚添加的End Point。如果不一致,需要将正确的End Point添加进去。
...@@ -121,7 +121,7 @@ taos> ...@@ -121,7 +121,7 @@ taos>
### 添加数据节点 ### 添加数据节点
执行CLI程序taos, 使用root账号登录进系统, 执行: 执行CLI程序taos,使用root账号登录进系统,执行:
``` ```
CREATE DNODE "fqdn:port"; CREATE DNODE "fqdn:port";
...@@ -131,13 +131,13 @@ CREATE DNODE "fqdn:port"; ...@@ -131,13 +131,13 @@ CREATE DNODE "fqdn:port";
### 删除数据节点 ### 删除数据节点
执行CLI程序taos, 使用root账号登录进TDengine系统,执行: 执行CLI程序taos使用root账号登录进TDengine系统,执行:
``` ```mysql
DROP DNODE "fqdn:port"; DROP DNODE "fqdn:port | dnodeID";
``` ```
其中fqdn是被删除的节点的FQDN,port是其对外服务器的端口号 通过"fqdn:port"或"dnodeID"来指定一个具体的节点都是可以的。其中fqdn是被删除的节点的FQDN,port是其对外服务器的端口号;dnodeID可以通过SHOW DNODES获得。
<font color=green>**【注意】**</font> <font color=green>**【注意】**</font>
...@@ -147,25 +147,41 @@ DROP DNODE "fqdn:port"; ...@@ -147,25 +147,41 @@ DROP DNODE "fqdn:port";
- 一个数据节点被drop之后,其他节点都会感知到这个dnodeID的删除操作,任何集群中的节点都不会再接收此dnodeID的请求。 - 一个数据节点被drop之后,其他节点都会感知到这个dnodeID的删除操作,任何集群中的节点都不会再接收此dnodeID的请求。
- dnodeID的是集群自动分配的,不得人工指定。它在生成时递增的,不会重复。 - dnodeID是集群自动分配的,不得人工指定。它在生成时是递增的,不会重复。
### 查看数据节点 ### 手动迁移数据节点
手动将某个vnode迁移到指定的dnode。
执行CLI程序taos,使用root账号登录进TDengine系统,执行: 执行CLI程序taos使用root账号登录进TDengine系统,执行:
```mysql
ALTER DNODE <source-dnodeId> BALANCE "VNODE:<vgId>-DNODE:<dest-dnodeId>";
``` ```
其中:source-dnodeId是源dnodeId,也就是待迁移的vnode所在的dnodeID;vgId可以通过SHOW VGROUPS获得,列表的第一列;dest-dnodeId是目标dnodeId。
<font color=green>**【注意】**</font>
- 只有在集群的自动负载均衡选项关闭时(balance设置为0),才允许手动迁移。
- 只有处于正常工作状态的vnode才能被迁移:master/slave,当处于offline/unsynced/syncing状态时,是不能迁移的。
- 迁移前,务必核实目标dnode的资源足够:CPU、内存、硬盘。
### 查看数据节点
执行CLI程序taos,使用root账号登录进TDengine系统,执行:
```mysql
SHOW DNODES; SHOW DNODES;
``` ```
它将列出集群中所有的dnode,每个dnode的fqdn:port, 状态(ready, offline等),vnode数目,还未使用的vnode数目等信息。在添加或删除一个数据节点后,可以使用该命令查看。 它将列出集群中所有的dnode,每个dnode的ID,end_point(fqdn:port),状态(ready, offline等),vnode数目,还未使用的vnode数目等信息。在添加或删除一个数据节点后,可以使用该命令查看。
### 查看虚拟节点组 ### 查看虚拟节点组
为充分利用多核技术,并提供scalability,数据需要分片处理。因此TDengine会将一个DB的数据切分成多份,存放在多个vnode里。这些vnode可能分布在多个数据节点dnode里,这样就实现了水平扩展。一个vnode仅仅属于一个DB,但一个DB可以有多个vnode。vnode的是mnode根据当前系统资源的情况,自动进行分配的,无需任何人工干预。 为充分利用多核技术,并提供scalability,数据需要分片处理。因此TDengine会将一个DB的数据切分成多份,存放在多个vnode里。这些vnode可能分布在多个数据节点dnode里,这样就实现了水平扩展。一个vnode仅仅属于一个DB,但一个DB可以有多个vnode。vnode的是mnode根据当前系统资源的情况,自动进行分配的,无需任何人工干预。
执行CLI程序taos,使用root账号登录进TDengine系统,执行: 执行CLI程序taos,使用root账号登录进TDengine系统,执行:
```mysql
```
SHOW VGROUPS; SHOW VGROUPS;
``` ```
...@@ -173,9 +189,9 @@ SHOW VGROUPS; ...@@ -173,9 +189,9 @@ SHOW VGROUPS;
TDengine通过多副本的机制来提供系统的高可用性,包括vnode和mnode的高可用性。 TDengine通过多副本的机制来提供系统的高可用性,包括vnode和mnode的高可用性。
vnode的副本数是与DB关联的,一个集群里可以有多个DB,根据运营的需求,每个DB可以配置不同的副本数。创建数据库时,通过参数replica 指定副本数(缺省为1)。如果副本数为1,系统的可靠性无法保证,只要数据所在的节点宕机,就将无法提供服务。集群的节点数必须大于等于副本数,否则创建表时将返回错误more dnodes are needed"。比如下面的命令将创建副本数为3的数据库demo: vnode的副本数是与DB关联的,一个集群里可以有多个DB,根据运营的需求,每个DB可以配置不同的副本数。创建数据库时,通过参数replica 指定副本数(缺省为1)。如果副本数为1,系统的可靠性无法保证,只要数据所在的节点宕机,就将无法提供服务。集群的节点数必须大于等于副本数,否则创建表时将返回错误"more dnodes are needed"。比如下面的命令将创建副本数为3的数据库demo:
``` ```mysql
CREATE DATABASE demo replica 3; CREATE DATABASE demo replica 3;
``` ```
...@@ -183,20 +199,19 @@ CREATE DATABASE demo replica 3; ...@@ -183,20 +199,19 @@ CREATE DATABASE demo replica 3;
一个数据节点dnode里可能有多个DB的数据,因此一个dnode离线时,可能会影响到多个DB。如果一个vnode group里的一半或一半以上的vnode不工作,那么该vnode group就无法对外服务,无法插入或读取数据,这样会影响到它所属的DB的一部分表的读写操作。 一个数据节点dnode里可能有多个DB的数据,因此一个dnode离线时,可能会影响到多个DB。如果一个vnode group里的一半或一半以上的vnode不工作,那么该vnode group就无法对外服务,无法插入或读取数据,这样会影响到它所属的DB的一部分表的读写操作。
因为vnode的引入,无法简单给出结论:“集群中过半数据节点dnode工作,集群就应该工作”。但是对于简单的情形,很好下结论。比如副本数为3,只有三个dnode,那如果仅有一个节点不工作,整个集群还是可以正常工作的,但如果有两个数据节点不工作,那整个集群就无法正常工作了。 因为vnode的引入,无法简单给出结论:“集群中过半数据节点dnode工作,集群就应该工作”。但是对于简单的情形,很好下结论。比如副本数为3,只有三个dnode,那如果仅有一个节点不工作,整个集群还是可以正常工作的,但如果有两个数据节点不工作,那整个集群就无法正常工作了。
## <a class="anchor" id="mnode"></a>Mnode的高可用性 ## <a class="anchor" id="mnode"></a>Mnode的高可用性
TDengine集群是由mnode (taosd的一个模块,管理节点) 负责管理的,为保证mnode的高可用,可以配置多个mnode副本,副本数由系统配置参数numOfMnodes决定,有效范围为1-3。为保证元数据的强一致性,mnode副本之间是通过同步的方式进行数据复制的。 TDengine集群是由mnode (taosd的一个模块,管理节点) 负责管理的,为保证mnode的高可用,可以配置多个mnode副本,副本数由系统配置参数numOfMnodes决定,有效范围为1-3。为保证元数据的强一致性,mnode副本之间是通过同步的方式进行数据复制的。
一个集群有多个数据节点dnode, 但一个dnode至多运行一个mnode实例。多个dnode情况下,哪个dnode可以作为mnode呢?这是完全由系统根据整个系统资源情况,自动指定的。用户可通过CLI程序taos,在TDengine的console里,执行如下命令: 一个集群有多个数据节点dnode但一个dnode至多运行一个mnode实例。多个dnode情况下,哪个dnode可以作为mnode呢?这是完全由系统根据整个系统资源情况,自动指定的。用户可通过CLI程序taos,在TDengine的console里,执行如下命令:
``` ```mysql
SHOW MNODES; SHOW MNODES;
``` ```
来查看mnode列表,该列表将列出mnode所处的dnode的End Point和角色(master, slave, unsynced 或offline)。 来查看mnode列表,该列表将列出mnode所处的dnode的End Point和角色(master, slave, unsynced 或offline)。当集群中第一个数据节点启动时,该数据节点一定会运行一个mnode实例,否则该数据节点dnode无法正常工作,因为一个系统是必须有至少一个mnode的。如果numOfMnodes配置为2,启动第二个dnode时,该dnode也将运行一个mnode实例。
当集群中第一个数据节点启动时,该数据节点一定会运行一个mnode实例,否则该数据节点dnode无法正常工作,因为一个系统是必须有至少一个mnode的。如果numOfMnodes配置为2,启动第二个dnode时,该dnode也将运行一个mnode实例。
为保证mnode服务的高可用性,numOfMnodes必须设置为2或更大。因为mnode保存的元数据必须是强一致的,如果numOfMnodes大于2,复制参数quorum自动设为2,也就是说,至少要保证有两个副本写入数据成功,才通知客户端应用写入成功。 为保证mnode服务的高可用性,numOfMnodes必须设置为2或更大。因为mnode保存的元数据必须是强一致的,如果numOfMnodes大于2,复制参数quorum自动设为2,也就是说,至少要保证有两个副本写入数据成功,才通知客户端应用写入成功。
...@@ -210,7 +225,7 @@ SHOW MNODES; ...@@ -210,7 +225,7 @@ SHOW MNODES;
- 当一个数据节点从集群中移除时,系统将自动把该数据节点上的数据转移到其他数据节点,无需任何人工干预。 - 当一个数据节点从集群中移除时,系统将自动把该数据节点上的数据转移到其他数据节点,无需任何人工干预。
- 如果一个数据节点过热(数据量过大),系统将自动进行负载均衡,将该数据节点的一些vnode自动挪到其他节点。 - 如果一个数据节点过热(数据量过大),系统将自动进行负载均衡,将该数据节点的一些vnode自动挪到其他节点。
当上述三种情况发生时,系统将启动各个数据节点的负载计算,从而决定如何挪动。 当上述三种情况发生时,系统将启动各个数据节点的负载计算,从而决定如何挪动。
**【提示】负载均衡由参数balance控制,它决定是否启动自动负载均衡。** **【提示】负载均衡由参数balance控制,它决定是否启动自动负载均衡。**
...@@ -225,7 +240,7 @@ SHOW MNODES; ...@@ -225,7 +240,7 @@ SHOW MNODES;
## <a class="anchor" id="arbitrator"></a>Arbitrator的使用 ## <a class="anchor" id="arbitrator"></a>Arbitrator的使用
如果副本数为偶数,当一个 vnode group 里一半 vnode 不工作时,是无法从中选出 master 的。同理,一半 mnode 不工作时,是无法选出 mnode 的 master 的,因为存在“split brain”问题。为解决这个问题,TDengine 引入了 Arbitrator 的概念。Arbitrator 模拟一个 vnode 或 mnode 在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含 Arbitrator 在内,超过半数的 vnode 或 mnode 工作,那么该 vnode group 或 mnode 组就可以正常的提供数据插入或查询服务。比如对于副本数为 2 的情形,如果一个节点 A 离线,但另外一个节点 B 正常,而且能连接到 Arbitrator,那么节点 B 就能正常工作。 如果副本数为偶数,当一个 vnode group 里一半或超过一半的 vnode 不工作时,是无法从中选出 master 的。同理,一半或超过一半的 mnode 不工作时,是无法选出 mnode 的 master 的,因为存在“split brain”问题。为解决这个问题,TDengine 引入了 Arbitrator 的概念。Arbitrator 模拟一个 vnode 或 mnode 在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含 Arbitrator 在内,超过半数的 vnode 或 mnode 工作,那么该 vnode group 或 mnode 组就可以正常的提供数据插入或查询服务。比如对于副本数为 2 的情形,如果一个节点 A 离线,但另外一个节点 B 正常,而且能连接到 Arbitrator,那么节点 B 就能正常工作。
总之,在目前版本下,TDengine 建议在双副本环境要配置 Arbitrator,以提升系统的可用性。 总之,在目前版本下,TDengine 建议在双副本环境要配置 Arbitrator,以提升系统的可用性。
...@@ -235,3 +250,9 @@ Arbitrator 的执行程序名为 tarbitrator。该程序对系统资源几乎没 ...@@ -235,3 +250,9 @@ Arbitrator 的执行程序名为 tarbitrator。该程序对系统资源几乎没
3. 修改每个 taosd 实例的配置文件,在 taos.cfg 里将参数 arbitrator 设置为 tarbitrator 程序所对应的 End Point。(如果该参数配置了,当副本数为偶数时,系统将自动连接配置的 Arbitrator。如果副本数为奇数,即使配置了 Arbitrator,系统也不会去建立连接。) 3. 修改每个 taosd 实例的配置文件,在 taos.cfg 里将参数 arbitrator 设置为 tarbitrator 程序所对应的 End Point。(如果该参数配置了,当副本数为偶数时,系统将自动连接配置的 Arbitrator。如果副本数为奇数,即使配置了 Arbitrator,系统也不会去建立连接。)
4. 在配置文件中配置了的 Arbitrator,会出现在 `SHOW DNODES;` 指令的返回结果中,对应的 role 列的值会是“arb”。 4. 在配置文件中配置了的 Arbitrator,会出现在 `SHOW DNODES;` 指令的返回结果中,对应的 role 列的值会是“arb”。
查看集群 Arbitrator 的状态【2.0.14.0 以后支持】
```mysql
SHOW DNODES;
```
...@@ -206,7 +206,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传 ...@@ -206,7 +206,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传
显示当前数据库下的所有数据表信息。 显示当前数据库下的所有数据表信息。
说明:可在like中使用通配符进行名称的匹配,这一通配符字符串最长不能超过24字节。 说明:可在 like 中使用通配符进行名称的匹配,这一通配符字符串最长不能超过 20 字节。( 从 2.1.6.1 版本开始,通配符字符串的长度放宽到了 100 字节,并可以通过 taos.cfg 中的 maxWildCardsLength 参数来配置这一长度限制。但不建议使用太长的通配符字符串,将有可能严重影响 LIKE 操作的执行性能。)
通配符匹配:1)'%'(百分号)匹配0到任意个字符;2)'\_'下划线匹配单个任意字符。 通配符匹配:1)'%'(百分号)匹配0到任意个字符;2)'\_'下划线匹配单个任意字符。
...@@ -435,6 +435,17 @@ INSERT INTO ...@@ -435,6 +435,17 @@ INSERT INTO
INSERT INTO d1001 FILE '/tmp/csvfile.csv'; INSERT INTO d1001 FILE '/tmp/csvfile.csv';
``` ```
- **插入来自文件的数据记录,并自动建表**
从 2.1.5.0 版本开始,支持在插入来自 CSV 文件的数据时,以超级表为模板来自动创建不存在的数据表。例如:
```mysql
INSERT INTO d21001 USING meters TAGS ('Beijing.Chaoyang', 2) FILE '/tmp/csvfile.csv';
```
也可以在一条语句中向多个表以自动建表的方式插入记录。例如:
```mysql
INSERT INTO d21001 USING meters TAGS ('Beijing.Chaoyang', 2) FILE '/tmp/csvfile_21001.csv'
d21002 USING meters (groupId) TAGS (2) FILE '/tmp/csvfile_21002.csv';
```
**历史记录写入**:可使用IMPORT或者INSERT命令,IMPORT的语法,功能与INSERT完全一样。 **历史记录写入**:可使用IMPORT或者INSERT命令,IMPORT的语法,功能与INSERT完全一样。
**说明:**针对 insert 类型的 SQL 语句,我们采用的流式解析策略,在发现后面的错误之前,前面正确的部分 SQL 仍会执行。下面的 SQL 中,INSERT 语句是无效的,但是 d1001 仍会被创建。 **说明:**针对 insert 类型的 SQL 语句,我们采用的流式解析策略,在发现后面的错误之前,前面正确的部分 SQL 仍会执行。下面的 SQL 中,INSERT 语句是无效的,但是 d1001 仍会被创建。
...@@ -942,6 +953,8 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数 ...@@ -942,6 +953,8 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
### 选择函数 ### 选择函数
在使用所有的选择函数的时候,可以同时指定输出 ts 列或标签列(包括 tbname),这样就可以方便地知道被选出的值是源于哪个数据行的。
- **MIN** - **MIN**
```mysql ```mysql
SELECT MIN(field_name) FROM {tb_name | stb_name} [WHERE clause]; SELECT MIN(field_name) FROM {tb_name | stb_name} [WHERE clause];
...@@ -1215,6 +1228,37 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数 ...@@ -1215,6 +1228,37 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
Query OK, 1 row(s) in set (0.001042s) Query OK, 1 row(s) in set (0.001042s)
``` ```
- **INTERP**
```mysql
SELECT INTERP(field_name) FROM { tb_name | stb_name } WHERE ts='timestamp' [FILL ({ VALUE | PREV | NULL | LINEAR})];
```
功能说明:返回表/超级表的指定时间截面、指定字段的记录。
返回结果数据类型:同应用的字段。
应用字段:所有字段。
适用于:**表、超级表**。
说明:(从 2.0.15.0 版本开始新增此函数)INTERP 必须指定时间断面,如果该时间断面不存在直接对应的数据,那么会根据 FILL 参数的设定进行插值。其中,条件语句里面可以附带更多的筛选条件,例如标签、tbname。
限制:INTERP 目前不支持 FILL(NEXT)。
示例:
```mysql
taos> select interp(*) from meters where ts='2017-7-14 10:42:00.005' fill(prev);
interp(ts) | interp(f1) | interp(f2) | interp(f3) |
====================================================================
2017-07-14 10:42:00.005 | 5 | 9 | 6 |
Query OK, 1 row(s) in set (0.002912s)
taos> select interp(*) from meters where tbname in ('t1') and ts='2017-7-14 10:42:00.005' fill(prev);
interp(ts) | interp(f1) | interp(f2) | interp(f3) |
====================================================================
2017-07-14 10:42:00.005 | 5 | 6 | 7 |
Query OK, 1 row(s) in set (0.002005s)
```
### 计算函数 ### 计算函数
- **DIFF** - **DIFF**
......
...@@ -342,7 +342,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild); ...@@ -342,7 +342,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild);
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta); uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta); CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize(); uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromSTableMeta(STableMeta** pChild, const char* name, size_t *tableMetaCapacity); int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta); STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo); SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
......
此差异已折叠。
此差异已折叠。
...@@ -299,7 +299,7 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) { ...@@ -299,7 +299,7 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
SSchema *schema = (SSchema*)pBlock->pTableMeta->schema; SSchema *schema = (SSchema*)pBlock->pTableMeta->schema;
for (int32_t i = 0; i < spd->numOfCols; ++i) { for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (!spd->cols[i].hasVal) { // current column do not have any value to insert, set it to null if (spd->cols[i].valStat == VAL_STAT_NONE) { // current column do not have any value to insert, set it to null
for (int32_t n = 0; n < rowNum; ++n) { for (int32_t n = 0; n < rowNum; ++n) {
char *ptr = pBlock->pData + sizeof(SSubmitBlk) + pBlock->rowSize * n + offset; char *ptr = pBlock->pData + sizeof(SSubmitBlk) + pBlock->rowSize * n + offset;
......
...@@ -4293,7 +4293,7 @@ static bool isValidExpr(tSqlExpr* pLeft, tSqlExpr* pRight, int32_t optr) { ...@@ -4293,7 +4293,7 @@ static bool isValidExpr(tSqlExpr* pLeft, tSqlExpr* pRight, int32_t optr) {
if (pRight == NULL) { if (pRight == NULL) {
return true; return true;
} }
if (pLeft->tokenId >= TK_BOOL && pLeft->tokenId <= TK_BINARY && pRight->tokenId >= TK_BOOL && pRight->tokenId <= TK_BINARY) { if (pLeft->tokenId >= TK_BOOL && pLeft->tokenId <= TK_BINARY && pRight->tokenId >= TK_BOOL && pRight->tokenId <= TK_BINARY) {
return false; return false;
} }
...@@ -5757,10 +5757,15 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { ...@@ -5757,10 +5757,15 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
} }
int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) { int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) {
const char* msg0 = "only support order by primary timestamp"; const char* msg0 = "only one column allowed in orderby";
const char* msg1 = "invalid column name"; const char* msg1 = "invalid column name in orderby clause";
const char* msg2 = "order by primary timestamp, first tag or groupby column in groupby clause allowed"; const char* msg2 = "too many order by columns";
const char* msg3 = "invalid column in order by clause, only primary timestamp or first tag in groupby clause allowed"; const char* msg3 = "only primary timestamp/tbname/first tag in groupby clause allowed";
const char* msg4 = "only tag in groupby clause allowed in order by";
const char* msg5 = "only primary timestamp/column in top/bottom function allowed as orderby column";
const char* msg6 = "only primary timestamp allowed as the second orderby column";
const char* msg7 = "only primary timestamp/column in groupby clause allowed as orderby column";
const char* msg8 = "only column in groupby clause allowed as orderby column";
setDefaultOrderInfo(pQueryInfo); setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -5790,7 +5795,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5790,7 +5795,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} }
} else { } else {
if (size > 2) { if (size > 2) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
} }
...@@ -5819,7 +5824,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5819,7 +5824,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
// it is a tag column // it is a tag column
if (pQueryInfo->groupbyExpr.columnInfo == NULL) { if (pQueryInfo->groupbyExpr.columnInfo == NULL) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
} }
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0); SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
if (relTagIndex == pColIndex->colIndex) { if (relTagIndex == pColIndex->colIndex) {
...@@ -5865,7 +5870,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5865,7 +5870,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1); pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
} }
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
...@@ -5906,7 +5911,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5906,7 +5911,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} }
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
} else { } else {
tVariantListItem* p1 = taosArrayGet(pSortorder, 1); tVariantListItem* p1 = taosArrayGet(pSortorder, 1);
pQueryInfo->order.order = p1->sortOrder; pQueryInfo->order.order = p1->sortOrder;
...@@ -5926,7 +5931,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5926,7 +5931,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
validOrder = (pColIndex->colIndex == index.columnIndex); validOrder = (pColIndex->colIndex == index.columnIndex);
} }
if (!validOrder) { if (!validOrder) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
} }
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId; pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
...@@ -5940,6 +5945,9 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5940,6 +5945,9 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) { if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) {
SColIndex* pColIndex = taosArrayGet(columnInfo, 0); SColIndex* pColIndex = taosArrayGet(columnInfo, 0);
validOrder = (pColIndex->colIndex == index.columnIndex); validOrder = (pColIndex->colIndex == index.columnIndex);
if (!validOrder) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
}
} else { } else {
/* order of top/bottom query in interval is not valid */ /* order of top/bottom query in interval is not valid */
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0); SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
...@@ -5947,15 +5955,11 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5947,15 +5955,11 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1); pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
} }
validOrder = true; validOrder = true;
} }
if (!validOrder) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.order = pItem->sortOrder;
...@@ -8399,19 +8403,13 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8399,19 +8403,13 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
//if (!pSql->pBuf) {
// if (NULL == (pSql->pBuf = tcalloc(1, 80 * TSDB_MAX_COLUMNS))) {
// code = TSDB_CODE_TSC_OUT_OF_MEMORY;
// goto _end;
// }
//}
plist = taosArrayInit(4, POINTER_BYTES); plist = taosArrayInit(4, POINTER_BYTES);
pVgroupList = taosArrayInit(4, POINTER_BYTES); pVgroupList = taosArrayInit(4, POINTER_BYTES);
taosArraySort(tableNameList, tnameComparFn); taosArraySort(tableNameList, tnameComparFn);
taosArrayRemoveDuplicate(tableNameList, tnameComparFn, NULL); taosArrayRemoveDuplicate(tableNameList, tnameComparFn, NULL);
STableMeta* pSTMeta = (STableMeta *)(pSql->pBuf);
size_t numOfTables = taosArrayGetSize(tableNameList); size_t numOfTables = taosArrayGetSize(tableNameList);
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
SName* pname = taosArrayGet(tableNameList, i); SName* pname = taosArrayGet(tableNameList, i);
...@@ -8427,7 +8425,8 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8427,7 +8425,8 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// avoid mem leak, may should update pTableMeta // avoid mem leak, may should update pTableMeta
void* pVgroupIdList = NULL; void* pVgroupIdList = NULL;
if (pTableMeta->tableType == TSDB_CHILD_TABLE) { if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
code = tscCreateTableMetaFromSTableMeta((STableMeta **)(&pTableMeta), name, &tableMetaCapacity); code = tscCreateTableMetaFromSTableMeta((STableMeta **)(&pTableMeta), name, &tableMetaCapacity, (STableMeta **)(&pSTMeta));
pSql->pBuf = (void *)pSTMeta;
// create the child table meta from super table failed, try load it from mnode // create the child table meta from super table failed, try load it from mnode
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -2872,18 +2872,19 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool ...@@ -2872,18 +2872,19 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
tNameExtractFullName(&pTableMetaInfo->name, name); tNameExtractFullName(&pTableMetaInfo->name, name);
size_t len = strlen(name); size_t len = strlen(name);
if (pTableMetaInfo->tableMetaCapacity != 0) { // just make runtime happy
if (pTableMetaInfo->pTableMeta != NULL) { if (pTableMetaInfo->tableMetaCapacity != 0 && pTableMetaInfo->pTableMeta != NULL) {
memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity); memset(pTableMetaInfo->pTableMeta, 0, pTableMetaInfo->tableMetaCapacity);
}
} }
taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity); taosHashGetCloneExt(tscTableMetaMap, name, len, NULL, (void **)&(pTableMetaInfo->pTableMeta), &pTableMetaInfo->tableMetaCapacity);
STableMeta* pMeta = pTableMetaInfo->pTableMeta; STableMeta* pMeta = pTableMetaInfo->pTableMeta;
STableMeta* pSTMeta = (STableMeta *)(pSql->pBuf);
if (pMeta && pMeta->id.uid > 0) { if (pMeta && pMeta->id.uid > 0) {
// in case of child table, here only get the // in case of child table, here only get the
if (pMeta->tableType == TSDB_CHILD_TABLE) { if (pMeta->tableType == TSDB_CHILD_TABLE) {
int32_t code = tscCreateTableMetaFromSTableMeta(&pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity); int32_t code = tscCreateTableMetaFromSTableMeta(&pTableMetaInfo->pTableMeta, name, &pTableMetaInfo->tableMetaCapacity, (STableMeta **)(&pSTMeta));
pSql->pBuf = (void *)(pSTMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate); return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
} }
......
...@@ -2404,8 +2404,8 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2404,8 +2404,8 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SColumn* x = taosArrayGetP(pNewQueryInfo->colList, index1); SColumn* x = taosArrayGetP(pNewQueryInfo->colList, index1);
tscColumnCopy(x, pCol); tscColumnCopy(x, pCol);
} else { } else {
SColumn *p = tscColumnClone(pCol); SSchema ss = {.type = (uint8_t)pCol->info.type, .bytes = pCol->info.bytes, .colId = (int16_t)pCol->columnIndex};
taosArrayPush(pNewQueryInfo->colList, &p); tscColumnListInsert(pNewQueryInfo->colList, pCol->columnIndex, pCol->tableUid, &ss);
} }
} }
} }
......
...@@ -1808,101 +1808,6 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i ...@@ -1808,101 +1808,6 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
SSchema* pSchema = pBuilder->pSchema;
char* p = (char*)pBuilder->buf;
int toffset = 0;
uint16_t nCols = pBuilder->nCols;
uint8_t memRowType = payloadType(p);
uint16_t nColsBound = payloadNCols(p);
if (pBuilder->nCols <= 0 || nColsBound <= 0) {
return NULL;
}
char* pVals = POINTER_SHIFT(p, payloadValuesOffset(p));
SMemRow* memRow = (SMemRow)pBuilder->pDataBlock;
memRowSetType(memRow, memRowType);
// ----------------- Raw payload structure for row:
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
* | |<----------------- flen ------------->|<--- value part --->|
* |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... |
* +-----------+----------+----------+--------------------------------------|--------------------|
* | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... |
* +-----------+----------+----------+--------------------------------------+--------------------|
* 1. offset in column data tuple starts from the value part in case of uint16_t overflow.
* 2. dataTLen: total length including the header and body.
*/
if (memRowType == SMEM_ROW_DATA) {
SDataRow trow = (SDataRow)memRowDataBody(memRow);
dataRowSetLen(trow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
dataRowSetVersion(trow, pBuilder->sversion);
p = (char*)payloadBody(pBuilder->buf);
uint16_t i = 0, j = 0;
while (j < nCols) {
if (i >= nColsBound) {
break;
}
int16_t colId = payloadColId(p);
if (colId == pSchema[j].colId) {
// ASSERT(payloadColType(p) == pSchema[j].type);
tdAppendColVal(trow, POINTER_SHIFT(pVals, payloadColOffset(p)), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
p = payloadNextCol(p);
++i;
++j;
} else if (colId < pSchema[j].colId) {
p = payloadNextCol(p);
++i;
} else {
tdAppendColVal(trow, getNullValue(pSchema[j].type), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
++j;
}
}
while (j < nCols) {
tdAppendColVal(trow, getNullValue(pSchema[j].type), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
++j;
}
#if 0 // no need anymore
while (i < nColsBound) {
p = payloadNextCol(p);
++i;
}
#endif
} else if (memRowType == SMEM_ROW_KV) {
SKVRow kvRow = (SKVRow)memRowKvBody(memRow);
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsBound));
kvRowSetNCols(kvRow, nColsBound);
memRowSetKvVersion(memRow, pBuilder->sversion);
p = (char*)payloadBody(pBuilder->buf);
int i = 0;
while (i < nColsBound) {
int16_t colId = payloadColId(p);
uint8_t colType = payloadColType(p);
tdAppendKvColVal(kvRow, POINTER_SHIFT(pVals,payloadColOffset(p)), colId, colType, &toffset);
//toffset += sizeof(SColIdx);
p = payloadNextCol(p);
++i;
}
} else {
ASSERT(0);
}
int32_t rowTLen = memRowTLen(memRow);
pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + rowTLen; // next row
pBuilder->pSubmitBlk->dataLen += rowTLen;
return memRow;
}
// Erase the empty space reserved for binary data // Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam, static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam,
SBlockKeyTuple* blkKeyTuple) { SBlockKeyTuple* blkKeyTuple) {
...@@ -1934,10 +1839,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI ...@@ -1934,10 +1839,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
int32_t schemaSize = sizeof(STColumn) * numOfCols; int32_t schemaSize = sizeof(STColumn) * numOfCols;
pBlock->schemaLen = schemaSize; pBlock->schemaLen = schemaSize;
} else { } else {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) { if (IS_RAW_PAYLOAD(insertParam->payloadType)) {
flen += TYPE_BYTES[pSchema[j].type]; for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
} }
pBlock->schemaLen = 0; pBlock->schemaLen = 0;
} }
...@@ -1964,18 +1870,19 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI ...@@ -1964,18 +1870,19 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
pBlock->dataLen += memRowTLen(memRow); pBlock->dataLen += memRowTLen(memRow);
} }
} else { } else {
SMemRowBuilder rowBuilder;
rowBuilder.pSchema = pSchema;
rowBuilder.sversion = pTableMeta->sversion;
rowBuilder.flen = flen;
rowBuilder.nCols = tinfo.numOfColumns;
rowBuilder.pDataBlock = pDataBlock;
rowBuilder.pSubmitBlk = pBlock;
rowBuilder.buf = p;
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
rowBuilder.buf = (blkKeyTuple + i)->payloadAddr; char* payload = (blkKeyTuple + i)->payloadAddr;
tdGenMemRowFromBuilder(&rowBuilder); if (isNeedConvertRow(payload)) {
convertSMemRow(pDataBlock, payload, pTableDataBlock);
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
} else {
TDRowTLenT rowTLen = memRowTLen(payload);
memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
}
} }
} }
...@@ -1988,9 +1895,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI ...@@ -1988,9 +1895,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
static int32_t getRowExpandSize(STableMeta* pTableMeta) { static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE; int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE;
int32_t columns = tscGetNumOfColumns(pTableMeta); int32_t columns = tscGetNumOfColumns(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta);
for(int32_t i = 0; i < columns; i++) { for (int32_t i = 0; i < columns; i++) {
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) { if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
} }
...@@ -2036,7 +1943,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -2036,7 +1943,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
if (pBlocks->numOfRows > 0) { if (pBlocks->numOfRows > 0) {
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta); int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
...@@ -2049,7 +1956,8 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -2049,7 +1956,8 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
return ret; return ret;
} }
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
if (dataBuf->nAllocSize < destSize) { if (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5); dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
...@@ -2093,7 +2001,9 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl ...@@ -2093,7 +2001,9 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
pBlocks->numOfRows, pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey); pBlocks->numOfRows, pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey);
} }
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); int32_t len = pBlocks->numOfRows *
(isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
pBlocks->tid = htonl(pBlocks->tid); pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid); pBlocks->uid = htobe64(pBlocks->uid);
...@@ -4554,14 +4464,16 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) { ...@@ -4554,14 +4464,16 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) {
return cMeta; return cMeta;
} }
int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity) { int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta**ppSTable) {
assert(*ppChild != NULL); assert(*ppChild != NULL);
STableMeta* p = *ppSTable;
STableMeta* p = NULL;
size_t sz = 0;
STableMeta* pChild = *ppChild; STableMeta* pChild = *ppChild;
size_t sz = (p != NULL) ? tscGetTableMetaSize(p) : 0; //ppSTableBuf actually capacity may larger than sz, dont care
if (p != NULL && sz != 0) {
memset((char *)p, 0, sz);
}
taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz); taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz);
*ppSTable = p;
// tableMeta exists, build child table meta according to the super table meta // tableMeta exists, build child table meta according to the super table meta
// the uid need to be checked in addition to the general name of the super table. // the uid need to be checked in addition to the general name of the super table.
...@@ -4580,10 +4492,8 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, ...@@ -4580,10 +4492,8 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
memcpy(pChild->schema, p->schema, totalBytes); memcpy(pChild->schema, p->schema, totalBytes);
*ppChild = pChild; *ppChild = pChild;
tfree(p);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // super table has been removed, current tableMeta is also expired. remove it here } else { // super table has been removed, current tableMeta is also expired. remove it here
tfree(p);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
return -1; return -1;
} }
......
...@@ -186,6 +186,7 @@ typedef void *SDataRow; ...@@ -186,6 +186,7 @@ typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t)) #define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define dataRowLen(r) (*(TDRowLenT *)(r)) // 0~65535 #define dataRowLen(r) (*(TDRowLenT *)(r)) // 0~65535
#define dataRowEnd(r) POINTER_SHIFT(r, dataRowLen(r))
#define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) #define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE) #define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r))) #define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
...@@ -201,14 +202,18 @@ void tdFreeDataRow(SDataRow row); ...@@ -201,14 +202,18 @@ void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema); void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row); SDataRow tdDataRowDup(SDataRow row);
// offset here not include dataRow header length // offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) { static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool isCopyVarData, int8_t type,
int32_t offset) {
ASSERT(value != NULL); ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row); *(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
memcpy(POINTER_SHIFT(row, dataRowLen(row)), value, varDataTLen(value)); if (isCopyVarData) {
memcpy(POINTER_SHIFT(row, dataRowLen(row)), value, varDataTLen(value));
}
dataRowLen(row) += varDataTLen(value); dataRowLen(row) += varDataTLen(value);
} else { } else {
if (offset == 0) { if (offset == 0) {
...@@ -223,6 +228,12 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t t ...@@ -223,6 +228,12 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t t
return 0; return 0;
} }
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
return tdAppendDataColVal(row, value, true, type, offset);
}
// NOTE: offset here including the header size // NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) { static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
...@@ -472,9 +483,10 @@ static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) { ...@@ -472,9 +483,10 @@ static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
} }
// offset here not include kvRow header length // offset here not include kvRow header length
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t *offset) { static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, bool isCopyValData, int16_t colId, int8_t type,
int32_t offset) {
ASSERT(value != NULL); ASSERT(value != NULL);
int32_t toffset = *offset + TD_KV_ROW_HEAD_SIZE; int32_t toffset = offset + TD_KV_ROW_HEAD_SIZE;
SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row, toffset); SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row, toffset);
char * ptr = (char *)POINTER_SHIFT(row, kvRowLen(row)); char * ptr = (char *)POINTER_SHIFT(row, kvRowLen(row));
...@@ -482,10 +494,12 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t ...@@ -482,10 +494,12 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t
pColIdx->offset = kvRowLen(row); // offset of pColIdx including the TD_KV_ROW_HEAD_SIZE pColIdx->offset = kvRowLen(row); // offset of pColIdx including the TD_KV_ROW_HEAD_SIZE
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
memcpy(ptr, value, varDataTLen(value)); if (isCopyValData) {
memcpy(ptr, value, varDataTLen(value));
}
kvRowLen(row) += varDataTLen(value); kvRowLen(row) += varDataTLen(value);
} else { } else {
if (*offset == 0) { if (offset == 0) {
ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
TKEY tvalue = tdGetTKEY(*(TSKEY *)value); TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
memcpy(ptr, (void *)(&tvalue), TYPE_BYTES[type]); memcpy(ptr, (void *)(&tvalue), TYPE_BYTES[type]);
...@@ -494,7 +508,6 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t ...@@ -494,7 +508,6 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t
} }
kvRowLen(row) += TYPE_BYTES[type]; kvRowLen(row) += TYPE_BYTES[type];
} }
*offset += sizeof(SColIdx);
return 0; return 0;
} }
...@@ -589,12 +602,24 @@ typedef void *SMemRow; ...@@ -589,12 +602,24 @@ typedef void *SMemRow;
#define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE) #define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE)
#define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE) #define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE)
#define SMEM_ROW_DATA 0U // SDataRow #define SMEM_ROW_DATA 0x0U // SDataRow
#define SMEM_ROW_KV 1U // SKVRow #define SMEM_ROW_KV 0x01U // SKVRow
#define SMEM_ROW_CONVERT 0x80U // SMemRow convert flag
#define KVRatioKV (0.2f) // all bool
#define KVRatioPredict (0.4f)
#define KVRatioData (0.75f) // all bigint
#define KVRatioConvert (0.9f)
#define memRowType(r) ((*(uint8_t *)(r)) & 0x01)
#define memRowType(r) (*(uint8_t *)(r)) #define memRowSetType(r, t) ((*(uint8_t *)(r)) = (t)) // set the total byte in case of dirty memory
#define memRowSetConvert(r) ((*(uint8_t *)(r)) = (((*(uint8_t *)(r)) & 0x7F) | SMEM_ROW_CONVERT)) // highest bit
#define isDataRowT(t) (SMEM_ROW_DATA == (((uint8_t)(t)) & 0x01))
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r)) #define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
#define isKvRowT(t) (SMEM_ROW_KV == (((uint8_t)(t)) & 0x01))
#define isKvRow(r) (SMEM_ROW_KV == memRowType(r)) #define isKvRow(r) (SMEM_ROW_KV == memRowType(r))
#define isNeedConvertRow(r) (((*(uint8_t *)(r)) & 0x80) == SMEM_ROW_CONVERT)
#define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag #define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag
#define memRowKvBody(r) \ #define memRowKvBody(r) \
...@@ -611,6 +636,14 @@ typedef void *SMemRow; ...@@ -611,6 +636,14 @@ typedef void *SMemRow;
#define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r)) #define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r))
#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) // using uint32_t/int32_t to store the TLen #define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) // using uint32_t/int32_t to store the TLen
static FORCE_INLINE char *memRowEnd(SMemRow row) {
if (isDataRow(row)) {
return (char *)dataRowEnd(memRowDataBody(row));
} else {
return (char *)kvRowEnd(memRowKvBody(row));
}
}
#define memRowDataVersion(r) dataRowVersion(memRowDataBody(r)) #define memRowDataVersion(r) dataRowVersion(memRowDataBody(r))
#define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) #define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE))
#define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version #define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version
...@@ -628,7 +661,6 @@ typedef void *SMemRow; ...@@ -628,7 +661,6 @@ typedef void *SMemRow;
} \ } \
} while (0) } while (0)
#define memRowSetType(r, t) (memRowType(r) = (t))
#define memRowSetLen(r, l) (isDataRow(r) ? memRowDataLen(r) = (l) : memRowKvLen(r) = (l)) #define memRowSetLen(r, l) (isDataRow(r) ? memRowDataLen(r) = (l) : memRowKvLen(r) = (l))
#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowSetKvVersion(r, v)) #define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowSetKvVersion(r, v))
#define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r)) #define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r))
...@@ -661,12 +693,12 @@ static FORCE_INLINE void *tdGetMemRowDataOfColEx(void *row, int16_t colId, int8_ ...@@ -661,12 +693,12 @@ static FORCE_INLINE void *tdGetMemRowDataOfColEx(void *row, int16_t colId, int8_
} }
} }
static FORCE_INLINE int tdAppendMemColVal(SMemRow row, const void *value, int16_t colId, int8_t type, int32_t offset, static FORCE_INLINE int tdAppendMemRowColVal(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
int32_t *kvOffset) { int8_t type, int32_t offset) {
if (isDataRow(row)) { if (isDataRow(row)) {
tdAppendColVal(memRowDataBody(row), value, type, offset); tdAppendDataColVal(memRowDataBody(row), value, isCopyVarData, type, offset);
} else { } else {
tdAppendKvColVal(memRowKvBody(row), value, colId, type, kvOffset); tdAppendKvColVal(memRowKvBody(row), value, isCopyVarData, colId, type, offset);
} }
return 0; return 0;
} }
...@@ -688,6 +720,30 @@ static FORCE_INLINE int32_t tdGetColAppendLen(uint8_t rowType, const void *value ...@@ -688,6 +720,30 @@ static FORCE_INLINE int32_t tdGetColAppendLen(uint8_t rowType, const void *value
return len; return len;
} }
/**
* 1. calculate the delta of AllNullLen for SDataRow.
* 2. calculate the real len for SKVRow.
*/
static FORCE_INLINE void tdGetColAppendDeltaLen(const void *value, int8_t colType, int32_t *dataLen, int32_t *kvLen) {
switch (colType) {
case TSDB_DATA_TYPE_BINARY: {
int32_t varLen = varDataLen(value);
*dataLen += (varLen - CHAR_BYTES);
*kvLen += (varLen + sizeof(SColIdx));
break;
}
case TSDB_DATA_TYPE_NCHAR: {
int32_t varLen = varDataLen(value);
*dataLen += (varLen - TSDB_NCHAR_SIZE);
*kvLen += (varLen + sizeof(SColIdx));
break;
}
default: {
*kvLen += (TYPE_BYTES[colType] + sizeof(SColIdx));
break;
}
}
}
typedef struct { typedef struct {
int16_t colId; int16_t colId;
...@@ -703,7 +759,7 @@ static FORCE_INLINE void setSColInfo(SColInfo* colInfo, int16_t colId, uint8_t c ...@@ -703,7 +759,7 @@ static FORCE_INLINE void setSColInfo(SColInfo* colInfo, int16_t colId, uint8_t c
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2); SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2);
#if 0
// ----------------- Raw payload structure for row: // ----------------- Raw payload structure for row:
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->| /* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
* | |<----------------- flen ------------->|<--- value part --->| * | |<----------------- flen ------------->|<--- value part --->|
...@@ -749,6 +805,8 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch ...@@ -749,6 +805,8 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SHIFT(pCol, PAYLOAD_COL_HEAD_LEN); } static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SHIFT(pCol, PAYLOAD_COL_HEAD_LEN); }
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -41,6 +41,7 @@ extern char tsArbitrator[]; ...@@ -41,6 +41,7 @@ extern char tsArbitrator[];
extern int8_t tsArbOnline; extern int8_t tsArbOnline;
extern int64_t tsArbOnlineTimestamp; extern int64_t tsArbOnlineTimestamp;
extern int32_t tsDnodeId; extern int32_t tsDnodeId;
extern int64_t tsDnodeStartTime;
// common // common
extern int tsRpcTimer; extern int tsRpcTimer;
......
...@@ -851,7 +851,8 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch ...@@ -851,7 +851,8 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
int16_t k; int16_t k;
for (k = 0; k < nKvNCols; ++k) { for (k = 0; k < nKvNCols; ++k) {
SColInfo *pColInfo = taosArrayGet(stashRow, k); SColInfo *pColInfo = taosArrayGet(stashRow, k);
tdAppendKvColVal(kvRow, pColInfo->colVal, pColInfo->colId, pColInfo->colType, &toffset); tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
toffset += sizeof(SColIdx);
} }
ASSERT(kvLen == memRowTLen(tRow)); ASSERT(kvLen == memRowTLen(tRow));
} }
......
...@@ -46,6 +46,7 @@ int8_t tsArbOnline = 0; ...@@ -46,6 +46,7 @@ int8_t tsArbOnline = 0;
int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME; int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME;
char tsEmail[TSDB_FQDN_LEN] = {0}; char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0; int32_t tsDnodeId = 0;
int64_t tsDnodeStartTime = 0;
// common // common
int32_t tsRpcTimer = 300; int32_t tsRpcTimer = 300;
...@@ -991,7 +992,7 @@ static void doInitGlobalConfig(void) { ...@@ -991,7 +992,7 @@ static void doInitGlobalConfig(void) {
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0; cfg.minValue = 0;
cfg.maxValue = TSDB_MAX_ALLOWED_SQL_LEN; cfg.maxValue = TSDB_MAX_FIELD_LEN;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE; cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
......
...@@ -47,7 +47,8 @@ class TaosTimestamp extends Date { ...@@ -47,7 +47,8 @@ class TaosTimestamp extends Date {
super(Math.floor(date / 1000)); super(Math.floor(date / 1000));
this.precisionExtras = date % 1000; this.precisionExtras = date % 1000;
} else if (precision === 2) { } else if (precision === 2) {
super(parseInt(date / 1000000)); // use BigInt to fix: 1623254400999999999 / 1000000 = 1623254401000 which not expected
super(parseInt(BigInt(date) / 1000000n));
// use BigInt to fix: 1625801548423914405 % 1000000 = 914496 which not expected (914405) // use BigInt to fix: 1625801548423914405 % 1000000 = 914496 which not expected (914405)
this.precisionExtras = parseInt(BigInt(date) % 1000000n); this.precisionExtras = parseInt(BigInt(date) % 1000000n);
} else { } else {
......
{ {
"name": "td2.0-connector", "name": "td2.0-connector",
"version": "2.0.9", "version": "2.0.10",
"description": "A Node.js connector for TDengine.", "description": "A Node.js connector for TDengine.",
"main": "tdengine.js", "main": "tdengine.js",
"directories": { "directories": {
......
...@@ -195,6 +195,7 @@ int32_t dnodeInitSystem() { ...@@ -195,6 +195,7 @@ int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING); dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING);
moduleStart(); moduleStart();
tsDnodeStartTime = taosGetTimestampMs();
dnodeReportStep("TDengine", "initialized successfully", 1); dnodeReportStep("TDengine", "initialized successfully", 1);
dInfo("TDengine is initialized successfully"); dInfo("TDengine is initialized successfully");
......
...@@ -42,6 +42,8 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -42,6 +42,8 @@ int32_t main(int32_t argc, char *argv[]) {
} }
} else if (strcmp(argv[i], "-C") == 0) { } else if (strcmp(argv[i], "-C") == 0) {
dump_config = 1; dump_config = 1;
} else if (strcmp(argv[i], "--force-keep-file") == 0) {
tsdbForceKeepFile = true;
} else if (strcmp(argv[i], "--compact-mnode-wal") == 0) { } else if (strcmp(argv[i], "--compact-mnode-wal") == 0) {
tsCompactMnodeWal = 1; tsCompactMnodeWal = 1;
} else if (strcmp(argv[i], "-V") == 0) { } else if (strcmp(argv[i], "-V") == 0) {
......
...@@ -471,6 +471,7 @@ typedef struct { ...@@ -471,6 +471,7 @@ typedef struct {
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag bool topBotQuery; // TODO used bitwise flag
bool interpQuery; // interp query or not
bool groupbyColumn; // denote if this is a groupby normal column query bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo;// if the time window start/end required interpolation
......
...@@ -41,9 +41,16 @@ typedef struct { ...@@ -41,9 +41,16 @@ typedef struct {
int64_t avail; int64_t avail;
} SFSMeta; } SFSMeta;
typedef struct {
int64_t size;
int64_t used;
int64_t free;
int16_t nAvailDisks; // # of Available disks
} STierMeta;
int tfsInit(SDiskCfg *pDiskCfg, int ndisk); int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
void tfsDestroy(); void tfsDestroy();
void tfsUpdateInfo(SFSMeta *pFSMeta); void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
void tfsGetMeta(SFSMeta *pMeta); void tfsGetMeta(SFSMeta *pMeta);
void tfsAllocDisk(int expLevel, int *level, int *id); void tfsAllocDisk(int expLevel, int *level, int *id);
......
...@@ -75,6 +75,7 @@ extern char configDir[]; ...@@ -75,6 +75,7 @@ extern char configDir[];
#define BUFFER_SIZE TSDB_MAX_ALLOWED_SQL_LEN #define BUFFER_SIZE TSDB_MAX_ALLOWED_SQL_LEN
#define COND_BUF_LEN (BUFFER_SIZE - 30) #define COND_BUF_LEN (BUFFER_SIZE - 30)
#define COL_BUFFER_LEN ((TSDB_COL_NAME_LEN + 15) * TSDB_MAX_COLUMNS) #define COL_BUFFER_LEN ((TSDB_COL_NAME_LEN + 15) * TSDB_MAX_COLUMNS)
#define MAX_USERNAME_SIZE 64 #define MAX_USERNAME_SIZE 64
#define MAX_PASSWORD_SIZE 16 #define MAX_PASSWORD_SIZE 16
#define MAX_HOSTNAME_SIZE 253 // https://man7.org/linux/man-pages/man7/hostname.7.html #define MAX_HOSTNAME_SIZE 253 // https://man7.org/linux/man-pages/man7/hostname.7.html
...@@ -245,7 +246,6 @@ typedef struct SArguments_S { ...@@ -245,7 +246,6 @@ typedef struct SArguments_S {
uint32_t disorderRatio; // 0: no disorder, >0: x% uint32_t disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms, us or ns. accordig to database precision int disorderRange; // ms, us or ns. accordig to database precision
uint32_t method_of_delete; uint32_t method_of_delete;
char ** arg_list;
uint64_t totalInsertRows; uint64_t totalInsertRows;
uint64_t totalAffectedRows; uint64_t totalAffectedRows;
bool demo_mode; // use default column name and semi-random data bool demo_mode; // use default column name and semi-random data
...@@ -637,7 +637,6 @@ SArguments g_args = { ...@@ -637,7 +637,6 @@ SArguments g_args = {
0, // disorderRatio 0, // disorderRatio
1000, // disorderRange 1000, // disorderRange
1, // method_of_delete 1, // method_of_delete
NULL, // arg_list
0, // totalInsertRows; 0, // totalInsertRows;
0, // totalAffectedRows; 0, // totalAffectedRows;
true, // demo_mode; true, // demo_mode;
...@@ -1009,6 +1008,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -1009,6 +1008,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
arguments->datatype[0] = argv[i]; arguments->datatype[0] = argv[i];
arguments->datatype[1] = NULL;
} else { } else {
// more than one col // more than one col
int index = 0; int index = 0;
...@@ -1413,6 +1413,7 @@ static char *rand_float_str() ...@@ -1413,6 +1413,7 @@ static char *rand_float_str()
return g_randfloat_buff + (cursor * FLOAT_BUFF_LEN); return g_randfloat_buff + (cursor * FLOAT_BUFF_LEN);
} }
static float rand_float() static float rand_float()
{ {
static int cursor; static int cursor;
...@@ -6407,6 +6408,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6407,6 +6408,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
bool flagSleep = true; bool flagSleep = true;
uint64_t sleepTimeTotal = 0; uint64_t sleepTimeTotal = 0;
int percentComplete = 0;
int64_t totalRows = insertRows * pThreadInfo->ntables;
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) { if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
...@@ -6583,6 +6587,11 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6583,6 +6587,11 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows;
if (currentPercent > percentComplete ) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
percentComplete = currentPercent;
}
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n", printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
...@@ -6604,6 +6613,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6604,6 +6613,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} }
} }
} }
if (percentComplete < 100)
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
free_of_interlace: free_of_interlace:
tmfree(pThreadInfo->buffer); tmfree(pThreadInfo->buffer);
...@@ -6641,6 +6652,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6641,6 +6652,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->samplePos = 0; pThreadInfo->samplePos = 0;
int percentComplete = 0;
int64_t totalRows = insertRows * pThreadInfo->ntables;
for (uint64_t tableSeq = pThreadInfo->start_table_from; for (uint64_t tableSeq = pThreadInfo->start_table_from;
tableSeq <= pThreadInfo->end_table_to; tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) { tableSeq ++) {
...@@ -6746,6 +6760,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6746,6 +6760,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows;
if (currentPercent > percentComplete ) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
percentComplete = currentPercent;
}
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
...@@ -6768,6 +6787,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6768,6 +6787,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
__func__, __LINE__, pThreadInfo->samplePos); __func__, __LINE__, pThreadInfo->samplePos);
} }
} // tableSeq } // tableSeq
if (percentComplete < 100)
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
free_of_progressive: free_of_progressive:
tmfree(pThreadInfo->buffer); tmfree(pThreadInfo->buffer);
......
...@@ -253,11 +253,15 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -253,11 +253,15 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
int32_t connId = htonl(pHBMsg->connId); int32_t connId = htonl(pHBMsg->connId);
SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (pConn == NULL) {
pHBMsg->pid = htonl(pHBMsg->pid);
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName);
}
if (pConn == NULL) { if (pConn == NULL) {
// do not close existing links, otherwise // do not close existing links, otherwise
// mError("failed to create connId, close connect"); // mError("failed to create connId, close connect");
// pRsp->killConnection = 1; // pRsp->killConnection = 1;
} else { } else {
pRsp->connId = htonl(pConn->connId); pRsp->connId = htonl(pConn->connId);
mnodeSaveQueryStreamList(pConn, pHBMsg); mnodeSaveQueryStreamList(pConn, pHBMsg);
......
...@@ -65,7 +65,14 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -65,7 +65,14 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_MSG_NOT_PROCESSED; return TSDB_CODE_MND_MSG_NOT_PROCESSED;
} }
int32_t code = mnodeInitMsg(pMsg); int32_t code = grantCheck(TSDB_GRANT_TIME);
if (code != TSDB_CODE_SUCCESS) {
mError("msg:%p, app:%p type:%s not processed, reason:%s", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType],
tstrerror(code));
return code;
}
code = mnodeInitMsg(pMsg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("msg:%p, app:%p type:%s not processed, reason:%s", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], mError("msg:%p, app:%p type:%s not processed, reason:%s", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType],
tstrerror(code)); tstrerror(code));
......
...@@ -28,8 +28,11 @@ typedef struct { ...@@ -28,8 +28,11 @@ typedef struct {
int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize); int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize);
int32_t taosGetCpuCores();
void taosGetSystemInfo(); void taosGetSystemInfo();
bool taosReadProcIO(int64_t* rchars, int64_t* wchars);
bool taosGetProcIO(float *readKB, float *writeKB); bool taosGetProcIO(float *readKB, float *writeKB);
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes);
bool taosGetBandSpeed(float *bandSpeedKb); bool taosGetBandSpeed(float *bandSpeedKb);
void taosGetDisk(); void taosGetDisk();
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) ; bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) ;
......
...@@ -164,6 +164,10 @@ void taosKillSystem() { ...@@ -164,6 +164,10 @@ void taosKillSystem() {
exit(0); exit(0);
} }
int32_t taosGetCpuCores() {
return sysconf(_SC_NPROCESSORS_ONLN);
}
void taosGetSystemInfo() { void taosGetSystemInfo() {
// taosGetProcInfos(); // taosGetProcInfos();
...@@ -185,12 +189,25 @@ void taosGetSystemInfo() { ...@@ -185,12 +189,25 @@ void taosGetSystemInfo() {
taosGetSystemLocale(); taosGetSystemLocale();
} }
bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
if (rchars) *rchars = 0;
if (wchars) *wchars = 0;
return true;
}
bool taosGetProcIO(float *readKB, float *writeKB) { bool taosGetProcIO(float *readKB, float *writeKB) {
*readKB = 0; *readKB = 0;
*writeKB = 0; *writeKB = 0;
return true; return true;
} }
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
if (bytes) *bytes = 0;
if (rbytes) *rbytes = 0;
if (tbytes) *tbytes = 0;
return true;
}
bool taosGetBandSpeed(float *bandSpeedKb) { bool taosGetBandSpeed(float *bandSpeedKb) {
*bandSpeedKb = 0; *bandSpeedKb = 0;
return true; return true;
......
...@@ -277,7 +277,7 @@ static void taosGetSystemLocale() { // get and set default locale ...@@ -277,7 +277,7 @@ static void taosGetSystemLocale() { // get and set default locale
} }
} }
static int32_t taosGetCpuCores() { return (int32_t)sysconf(_SC_NPROCESSORS_ONLN); } int32_t taosGetCpuCores() { return (int32_t)sysconf(_SC_NPROCESSORS_ONLN); }
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
static uint64_t lastSysUsed = 0; static uint64_t lastSysUsed = 0;
...@@ -332,7 +332,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { ...@@ -332,7 +332,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
} }
} }
static bool taosGetCardInfo(int64_t *bytes) { bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
*bytes = 0; *bytes = 0;
FILE *fp = fopen(tsSysNetFile, "r"); FILE *fp = fopen(tsSysNetFile, "r");
if (fp == NULL) { if (fp == NULL) {
...@@ -347,9 +347,9 @@ static bool taosGetCardInfo(int64_t *bytes) { ...@@ -347,9 +347,9 @@ static bool taosGetCardInfo(int64_t *bytes) {
while (!feof(fp)) { while (!feof(fp)) {
memset(line, 0, len); memset(line, 0, len);
int64_t rbytes = 0; int64_t o_rbytes = 0;
int64_t rpackts = 0; int64_t rpackts = 0;
int64_t tbytes = 0; int64_t o_tbytes = 0;
int64_t tpackets = 0; int64_t tpackets = 0;
int64_t nouse1 = 0; int64_t nouse1 = 0;
int64_t nouse2 = 0; int64_t nouse2 = 0;
...@@ -374,8 +374,10 @@ static bool taosGetCardInfo(int64_t *bytes) { ...@@ -374,8 +374,10 @@ static bool taosGetCardInfo(int64_t *bytes) {
sscanf(line, sscanf(line,
"%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 "%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64
" %" PRId64, " %" PRId64,
nouse0, &rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &tbytes, &tpackets); nouse0, &o_rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &o_tbytes, &tpackets);
*bytes += (rbytes + tbytes); if (rbytes) *rbytes = o_rbytes;
if (tbytes) *tbytes = o_tbytes;
*bytes += (o_rbytes + o_tbytes);
} }
tfree(line); tfree(line);
...@@ -390,7 +392,7 @@ bool taosGetBandSpeed(float *bandSpeedKb) { ...@@ -390,7 +392,7 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
int64_t curBytes = 0; int64_t curBytes = 0;
time_t curTime = time(NULL); time_t curTime = time(NULL);
if (!taosGetCardInfo(&curBytes)) { if (!taosGetCardInfo(&curBytes, NULL, NULL)) {
return false; return false;
} }
...@@ -420,7 +422,7 @@ bool taosGetBandSpeed(float *bandSpeedKb) { ...@@ -420,7 +422,7 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
return true; return true;
} }
static bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) { bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
FILE *fp = fopen(tsProcIOFile, "r"); FILE *fp = fopen(tsProcIOFile, "r");
if (fp == NULL) { if (fp == NULL) {
uError("open file:%s failed", tsProcIOFile); uError("open file:%s failed", tsProcIOFile);
...@@ -441,10 +443,10 @@ static bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) { ...@@ -441,10 +443,10 @@ static bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) {
break; break;
} }
if (strstr(line, "rchar:") != NULL) { if (strstr(line, "rchar:") != NULL) {
sscanf(line, "%s %" PRId64, tmp, readbyte); sscanf(line, "%s %" PRId64, tmp, rchars);
readIndex++; readIndex++;
} else if (strstr(line, "wchar:") != NULL) { } else if (strstr(line, "wchar:") != NULL) {
sscanf(line, "%s %" PRId64, tmp, writebyte); sscanf(line, "%s %" PRId64, tmp, wchars);
readIndex++; readIndex++;
} else { } else {
} }
......
...@@ -115,7 +115,7 @@ static void taosGetSystemLocale() { ...@@ -115,7 +115,7 @@ static void taosGetSystemLocale() {
} }
} }
static int32_t taosGetCpuCores() { int32_t taosGetCpuCores() {
SYSTEM_INFO info; SYSTEM_INFO info;
GetSystemInfo(&info); GetSystemInfo(&info);
return (int32_t)info.dwNumberOfProcessors; return (int32_t)info.dwNumberOfProcessors;
...@@ -146,6 +146,13 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { ...@@ -146,6 +146,13 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
} }
} }
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
if (bytes) *bytes = 0;
if (rbytes) *rbytes = 0;
if (tbytes) *tbytes = 0;
return true;
}
bool taosGetBandSpeed(float *bandSpeedKb) { bool taosGetBandSpeed(float *bandSpeedKb) {
*bandSpeedKb = 0; *bandSpeedKb = 0;
return true; return true;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTPMETRICSHANDLE_H
#define TDENGINE_HTTPMETRICSHANDLE_H
#include "http.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
void metricsInitHandle(HttpServer* httpServer);
bool metricsProcessRequest(struct HttpContext* httpContext);
#endif // TDENGINE_HTTPMETRICHANDLE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tfs.h"
#include "httpMetricsHandle.h"
#include "dnode.h"
#include "httpLog.h"
static HttpDecodeMethod metricsDecodeMethod = {"metrics", metricsProcessRequest};
void metricsInitHandle(HttpServer* pServer) {
httpAddMethod(pServer, &metricsDecodeMethod);
}
bool metricsProcessRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, user:%s, process admin grant msg", pContext, pContext->fd, pContext->user);
JsonBuf* jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) {
httpError("failed to allocate memory for metrics");
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY);
return false;
}
httpInitJsonBuf(jsonBuf, pContext);
httpWriteJsonBufHead(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
{
char* keyDisks = "tags";
httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks));
httpJsonToken(jsonBuf, JsonArrStt);
{
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
char* keyTagName = "name";
char* keyTagValue = "value";
httpJsonPairOriginString(jsonBuf, keyTagName, (int32_t)strlen(keyTagName), "\"dnode_id\"",
(int32_t)strlen("\"dnode_id\""));
int32_t dnodeId = dnodeGetDnodeId();
httpJsonPairIntVal(jsonBuf, keyTagValue, (int32_t)strlen(keyTagValue), dnodeId);
httpJsonToken(jsonBuf, JsonObjEnd);
}
httpJsonToken(jsonBuf, JsonArrEnd);
}
{
if (tsDnodeStartTime != 0) {
int64_t now = taosGetTimestampMs();
int64_t upTime = now-tsDnodeStartTime;
char* keyUpTime = "up_time";
httpJsonPairInt64Val(jsonBuf, keyUpTime, (int32_t)strlen(keyUpTime), upTime);
}
}
{
int32_t cpuCores = taosGetCpuCores();
char* keyCpuCores = "cpu_cores";
httpJsonPairIntVal(jsonBuf, keyCpuCores, (int32_t)strlen(keyCpuCores), cpuCores);
float sysCpuUsage = 0;
float procCpuUsage = 0;
bool succeeded = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage);
if (!succeeded) {
httpError("failed to get cpu usage");
} else {
if (sysCpuUsage <= procCpuUsage) {
sysCpuUsage = procCpuUsage + 0.1f;
}
char* keyCpuSystem = "cpu_system";
char* keyCpuEngine = "cpu_engine";
httpJsonPairFloatVal(jsonBuf, keyCpuSystem, (int32_t)strlen(keyCpuSystem), sysCpuUsage);
httpJsonPairFloatVal(jsonBuf, keyCpuEngine, (int32_t)strlen(keyCpuEngine), procCpuUsage);
}
}
{
float sysMemoryUsedMB = 0;
bool succeeded = taosGetSysMemory(&sysMemoryUsedMB);
if (!succeeded) {
httpError("failed to get sys memory info");
} else {
char* keyMemSystem = "mem_system";
httpJsonPairFloatVal(jsonBuf, keyMemSystem, (int32_t)strlen(keyMemSystem), sysMemoryUsedMB);
}
float procMemoryUsedMB = 0;
succeeded = taosGetProcMemory(&procMemoryUsedMB);
if (!succeeded) {
httpError("failed to get proc memory info");
} else {
char* keyMemEngine = "mem_engine";
httpJsonPairFloatVal(jsonBuf, keyMemEngine, (int32_t)strlen(keyMemEngine), procMemoryUsedMB);
}
}
{
int64_t bytes = 0, rbytes = 0, tbytes = 0;
bool succeeded = taosGetCardInfo(&bytes, &rbytes, &tbytes);
if (!succeeded) {
httpError("failed to get network info");
} else {
char* keyNetIn = "net_in";
char* keyNetOut = "net_out";
httpJsonPairInt64Val(jsonBuf, keyNetIn, (int32_t)strlen(keyNetIn), rbytes);
httpJsonPairInt64Val(jsonBuf, keyNetOut, (int32_t)strlen(keyNetOut), tbytes);
}
}
{
int64_t rchars = 0;
int64_t wchars = 0;
bool succeeded = taosReadProcIO(&rchars, &wchars);
if (!succeeded) {
httpError("failed to get io info");
} else {
char* keyIORead = "io_read";
char* keyIOWrite = "io_write";
httpJsonPairInt64Val(jsonBuf, keyIORead, (int32_t)strlen(keyIORead), rchars);
httpJsonPairInt64Val(jsonBuf, keyIOWrite, (int32_t)strlen(keyIOWrite), wchars);
}
}
{
const int8_t numTiers = 3;
SFSMeta fsMeta;
STierMeta* tierMetas = calloc(numTiers, sizeof(STierMeta));
tfsUpdateInfo(&fsMeta, tierMetas, numTiers);
{
char* keyDiskUsed = "disk_used";
char* keyDiskTotal = "disk_total";
httpJsonPairInt64Val(jsonBuf, keyDiskTotal, (int32_t)strlen(keyDiskTotal), fsMeta.tsize);
httpJsonPairInt64Val(jsonBuf, keyDiskUsed, (int32_t)strlen(keyDiskUsed), fsMeta.used);
char* keyDisks = "disks";
httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks));
httpJsonToken(jsonBuf, JsonArrStt);
for (int i = 0; i < numTiers; ++i) {
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
char* keyDataDirLevelUsed = "datadir_used";
char* keyDataDirLevelTotal = "datadir_total";
httpJsonPairInt64Val(jsonBuf, keyDataDirLevelUsed, (int32_t)strlen(keyDataDirLevelUsed), tierMetas[i].used);
httpJsonPairInt64Val(jsonBuf, keyDataDirLevelTotal, (int32_t)strlen(keyDataDirLevelTotal), tierMetas[i].size);
httpJsonToken(jsonBuf, JsonObjEnd);
}
httpJsonToken(jsonBuf, JsonArrEnd);
}
free(tierMetas);
}
{
SStatisInfo info = dnodeGetStatisInfo();
{
char* keyReqHttp = "req_http";
char* keyReqSelect = "req_select";
char* keyReqInsert = "req_insert";
httpJsonPairInt64Val(jsonBuf, keyReqHttp, (int32_t)strlen(keyReqHttp), info.httpReqNum);
httpJsonPairInt64Val(jsonBuf, keyReqSelect, (int32_t)strlen(keyReqSelect), info.queryReqNum);
httpJsonPairInt64Val(jsonBuf, keyReqInsert, (int32_t)strlen(keyReqInsert), info.submitReqNum);
}
}
httpJsonToken(jsonBuf, JsonObjEnd);
httpWriteJsonBufEnd(jsonBuf);
pContext->reqType = HTTP_REQTYPE_OTHERS;
httpFreeJsonBuf(pContext);
return false;
}
\ No newline at end of file
...@@ -101,13 +101,17 @@ char *httpGetStatusDesc(int32_t statusCode) { ...@@ -101,13 +101,17 @@ char *httpGetStatusDesc(int32_t statusCode) {
} }
static void httpCleanupString(HttpString *str) { static void httpCleanupString(HttpString *str) {
free(str->str); if (str->str) {
str->str = NULL; free(str->str);
str->pos = 0; str->str = NULL;
str->size = 0; str->pos = 0;
str->size = 0;
}
} }
static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) { static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) {
char *new_str = NULL;
if (str->size == 0) { if (str->size == 0) {
str->pos = 0; str->pos = 0;
str->size = len + 1; str->size = len + 1;
...@@ -115,7 +119,16 @@ static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) { ...@@ -115,7 +119,16 @@ static int32_t httpAppendString(HttpString *str, const char *s, int32_t len) {
} else if (str->pos + len + 1 >= str->size) { } else if (str->pos + len + 1 >= str->size) {
str->size += len; str->size += len;
str->size *= 4; str->size *= 4;
str->str = realloc(str->str, str->size);
new_str = realloc(str->str, str->size);
if (new_str == NULL && str->str) {
// if str->str was not NULL originally,
// the old allocated memory was left unchanged,
// see man 3 realloc
free(str->str);
}
str->str = new_str;
} else { } else {
} }
...@@ -317,7 +330,7 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const ...@@ -317,7 +330,7 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const
static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) { static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) {
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
HttpString * buf = &parser->body; HttpString *buf = &parser->body;
if (parser->parseCode != TSDB_CODE_SUCCESS) return -1; if (parser->parseCode != TSDB_CODE_SUCCESS) return -1;
if (buf->size <= 0) { if (buf->size <= 0) {
...@@ -326,6 +339,7 @@ static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) { ...@@ -326,6 +339,7 @@ static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) {
} }
int32_t newSize = buf->pos + len + 1; int32_t newSize = buf->pos + len + 1;
char *newStr = NULL;
if (newSize >= buf->size) { if (newSize >= buf->size) {
if (buf->size >= HTTP_BUFFER_SIZE) { if (buf->size >= HTTP_BUFFER_SIZE) {
httpError("context:%p, fd:%d, failed parse body, exceeding buffer size %d", pContext, pContext->fd, buf->size); httpError("context:%p, fd:%d, failed parse body, exceeding buffer size %d", pContext, pContext->fd, buf->size);
...@@ -336,7 +350,12 @@ static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) { ...@@ -336,7 +350,12 @@ static int32_t httpOnBody(HttpParser *parser, const char *chunk, int32_t len) {
newSize = MAX(newSize, HTTP_BUFFER_INIT); newSize = MAX(newSize, HTTP_BUFFER_INIT);
newSize *= 4; newSize *= 4;
newSize = MIN(newSize, HTTP_BUFFER_SIZE); newSize = MIN(newSize, HTTP_BUFFER_SIZE);
buf->str = realloc(buf->str, newSize); newStr = realloc(buf->str, newSize);
if (newStr == NULL && buf->str) {
free(buf->str);
}
buf->str = newStr;
buf->size = newSize; buf->size = newSize;
if (buf->str == NULL) { if (buf->str == NULL) {
...@@ -374,13 +393,20 @@ static HTTP_PARSER_STATE httpTopStack(HttpParser *parser) { ...@@ -374,13 +393,20 @@ static HTTP_PARSER_STATE httpTopStack(HttpParser *parser) {
static int32_t httpPushStack(HttpParser *parser, HTTP_PARSER_STATE state) { static int32_t httpPushStack(HttpParser *parser, HTTP_PARSER_STATE state) {
HttpStack *stack = &parser->stacks; HttpStack *stack = &parser->stacks;
int8_t *newStacks = NULL;
if (stack->size == 0) { if (stack->size == 0) {
stack->pos = 0; stack->pos = 0;
stack->size = 32; stack->size = 32;
stack->stacks = malloc(stack->size * sizeof(int8_t)); stack->stacks = malloc(stack->size * sizeof(int8_t));
} else if (stack->pos + 1 > stack->size) { } else if (stack->pos + 1 > stack->size) {
stack->size *= 2; stack->size *= 2;
stack->stacks = realloc(stack->stacks, stack->size * sizeof(int8_t));
newStacks = realloc(stack->stacks, stack->size * sizeof(int8_t));
if (newStacks == NULL && stack->stacks) {
free(stack->stacks);
}
stack->stacks = newStacks;
} else { } else {
} }
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "httpGcHandle.h" #include "httpGcHandle.h"
#include "httpRestHandle.h" #include "httpRestHandle.h"
#include "httpTgHandle.h" #include "httpTgHandle.h"
#include "httpMetricsHandle.h"
#ifndef _ADMIN #ifndef _ADMIN
void adminInitHandle(HttpServer* pServer) {} void adminInitHandle(HttpServer* pServer) {}
...@@ -52,7 +53,7 @@ int32_t httpInitSystem() { ...@@ -52,7 +53,7 @@ int32_t httpInitSystem() {
gcInitHandle(&tsHttpServer); gcInitHandle(&tsHttpServer);
tgInitHandle(&tsHttpServer); tgInitHandle(&tsHttpServer);
opInitHandle(&tsHttpServer); opInitHandle(&tsHttpServer);
metricsInitHandle(&tsHttpServer);
return 0; return 0;
} }
......
...@@ -188,13 +188,17 @@ bool httpMallocMultiCmds(HttpContext *pContext, int32_t cmdSize, int32_t bufferS ...@@ -188,13 +188,17 @@ bool httpMallocMultiCmds(HttpContext *pContext, int32_t cmdSize, int32_t bufferS
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int32_t cmdSize) { bool httpReMallocMultiCmdsSize(HttpContext *pContext, int32_t cmdSize) {
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds *multiCmds = pContext->multiCmds;
if (cmdSize > HTTP_MAX_CMD_SIZE) { if (cmdSize <= 0 || cmdSize > HTTP_MAX_CMD_SIZE) {
httpError("context:%p, fd:%d, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, pContext->user, httpError("context:%p, fd:%d, user:%s, mulitcmd size:%d large then %d", pContext, pContext->fd, pContext->user,
cmdSize, HTTP_MAX_CMD_SIZE); cmdSize, HTTP_MAX_CMD_SIZE);
return false; return false;
} }
multiCmds->cmds = (HttpSqlCmd *)realloc(multiCmds->cmds, (size_t)cmdSize * sizeof(HttpSqlCmd)); HttpSqlCmd *new_cmds = (HttpSqlCmd *)realloc(multiCmds->cmds, (size_t)cmdSize * sizeof(HttpSqlCmd));
if (new_cmds == NULL && multiCmds->cmds) {
free(multiCmds->cmds);
}
multiCmds->cmds = new_cmds;
if (multiCmds->cmds == NULL) { if (multiCmds->cmds == NULL) {
httpError("context:%p, fd:%d, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->user, cmdSize); httpError("context:%p, fd:%d, user:%s, malloc cmds:%d error", pContext, pContext->fd, pContext->user, cmdSize);
return false; return false;
...@@ -208,13 +212,17 @@ bool httpReMallocMultiCmdsSize(HttpContext *pContext, int32_t cmdSize) { ...@@ -208,13 +212,17 @@ bool httpReMallocMultiCmdsSize(HttpContext *pContext, int32_t cmdSize) {
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int32_t bufferSize) { bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int32_t bufferSize) {
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds *multiCmds = pContext->multiCmds;
if (bufferSize > HTTP_MAX_BUFFER_SIZE) { if (bufferSize <= 0 || bufferSize > HTTP_MAX_BUFFER_SIZE) {
httpError("context:%p, fd:%d, user:%s, mulitcmd buffer size:%d large then %d", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, mulitcmd buffer size:%d large then %d", pContext, pContext->fd,
pContext->user, bufferSize, HTTP_MAX_BUFFER_SIZE); pContext->user, bufferSize, HTTP_MAX_BUFFER_SIZE);
return false; return false;
} }
multiCmds->buffer = (char *)realloc(multiCmds->buffer, (size_t)bufferSize); char *new_buffer = (char *)realloc(multiCmds->buffer, (size_t)bufferSize);
if (new_buffer == NULL && multiCmds->buffer) {
free(multiCmds->buffer);
}
multiCmds->buffer = new_buffer;
if (multiCmds->buffer == NULL) { if (multiCmds->buffer == NULL) {
httpError("context:%p, fd:%d, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->user, bufferSize); httpError("context:%p, fd:%d, user:%s, malloc buffer:%d error", pContext, pContext->fd, pContext->user, bufferSize);
return false; return false;
......
...@@ -333,6 +333,8 @@ enum OPERATOR_TYPE_E { ...@@ -333,6 +333,8 @@ enum OPERATOR_TYPE_E {
OP_Distinct = 20, OP_Distinct = 20,
OP_Join = 21, OP_Join = 21,
OP_StateWindow = 22, OP_StateWindow = 22,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
...@@ -554,11 +556,13 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -554,11 +556,13 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
......
...@@ -39,7 +39,6 @@ ...@@ -39,7 +39,6 @@
#define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId) #define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.param[0].i64:1)
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr); int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr);
...@@ -60,6 +59,7 @@ SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t ...@@ -60,6 +59,7 @@ SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr); void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols); void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols);
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
...@@ -70,7 +70,7 @@ static FORCE_INLINE char* getPosInResultPage(SQueryAttr* pQueryAttr, tFilePage* ...@@ -70,7 +70,7 @@ static FORCE_INLINE char* getPosInResultPage(SQueryAttr* pQueryAttr, tFilePage*
int32_t offset) { int32_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL); assert(rowOffset >= 0 && pQueryAttr != NULL);
int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows; return ((char *)page->data) + rowOffset + offset * numOfRows;
} }
......
...@@ -3708,27 +3708,59 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { ...@@ -3708,27 +3708,59 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
} }
} else { } else {
// no data generated yet // no data generated yet
if (pCtx->size == 1) { if (pCtx->size < 1) {
return; return;
} }
// check the timestamp in input buffer // check the timestamp in input buffer
TSKEY skey = GET_TS_DATA(pCtx, 0); TSKEY skey = GET_TS_DATA(pCtx, 0);
TSKEY ekey = GET_TS_DATA(pCtx, 1);
// no data generated yet
if (!(skey < pCtx->startTs && ekey > pCtx->startTs)) {
return;
}
assert(pCtx->start.key == INT64_MIN && skey < pCtx->startTs && ekey > pCtx->startTs);
if (type == TSDB_FILL_PREV) { if (type == TSDB_FILL_PREV) {
if (skey > pCtx->startTs) {
return;
}
if (pCtx->size > 1) {
TSKEY ekey = GET_TS_DATA(pCtx, 1);
if (ekey > skey && ekey <= pCtx->startTs) {
skey = ekey;
}
}
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType); assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType);
} else if (type == TSDB_FILL_NEXT) { } else if (type == TSDB_FILL_NEXT) {
char* val = ((char*)pCtx->pInput) + pCtx->inputBytes; TSKEY ekey = skey;
char* val = NULL;
if (ekey < pCtx->startTs) {
if (pCtx->size > 1) {
ekey = GET_TS_DATA(pCtx, 1);
if (ekey < pCtx->startTs) {
return;
}
val = ((char*)pCtx->pInput) + pCtx->inputBytes;
} else {
return;
}
} else {
val = (char*)pCtx->pInput;
}
assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType); assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType);
} else if (type == TSDB_FILL_LINEAR) { } else if (type == TSDB_FILL_LINEAR) {
if (pCtx->size <= 1) {
return;
}
TSKEY ekey = GET_TS_DATA(pCtx, 1);
// no data generated yet
if (!(skey < pCtx->startTs && ekey > pCtx->startTs)) {
return;
}
assert(pCtx->start.key == INT64_MIN && skey < pCtx->startTs && ekey > pCtx->startTs);
char *start = GET_INPUT_DATA(pCtx, 0); char *start = GET_INPUT_DATA(pCtx, 0);
char *end = GET_INPUT_DATA(pCtx, 1); char *end = GET_INPUT_DATA(pCtx, 1);
...@@ -4030,12 +4062,15 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD ...@@ -4030,12 +4062,15 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD
pDist->maxRows = pSrc->maxRows; pDist->maxRows = pSrc->maxRows;
pDist->minRows = pSrc->minRows; pDist->minRows = pSrc->minRows;
int32_t numSteps = tsMaxRowsInFileBlock/TSDB_BLOCK_DIST_STEP_ROWS; int32_t maxSteps = TSDB_MAX_MAX_ROW_FBLOCK/TSDB_BLOCK_DIST_STEP_ROWS;
pDist->dataBlockInfos = taosArrayInit(numSteps, sizeof(SFileBlockInfo)); if (TSDB_MAX_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
taosArraySetSize(pDist->dataBlockInfos, numSteps); ++maxSteps;
}
pDist->dataBlockInfos = taosArrayInit(maxSteps, sizeof(SFileBlockInfo));
taosArraySetSize(pDist->dataBlockInfos, maxSteps);
} }
size_t steps = taosArrayGetSize(pDist->dataBlockInfos); size_t steps = taosArrayGetSize(pSrc->dataBlockInfos);
for (int32_t i = 0; i < steps; ++i) { for (int32_t i = 0; i < steps; ++i) {
int32_t srcNumBlocks = ((SFileBlockInfo*)taosArrayGet(pSrc->dataBlockInfos, i))->numBlocksOfStep; int32_t srcNumBlocks = ((SFileBlockInfo*)taosArrayGet(pSrc->dataBlockInfos, i))->numBlocksOfStep;
SFileBlockInfo* blockInfo = (SFileBlockInfo*)taosArrayGet(pDist->dataBlockInfos, i); SFileBlockInfo* blockInfo = (SFileBlockInfo*)taosArrayGet(pDist->dataBlockInfos, i);
...@@ -4047,9 +4082,9 @@ void block_func_merge(SQLFunctionCtx* pCtx) { ...@@ -4047,9 +4082,9 @@ void block_func_merge(SQLFunctionCtx* pCtx) {
STableBlockDist info = {0}; STableBlockDist info = {0};
int32_t len = *(int32_t*) pCtx->pInput; int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info); blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
mergeTableBlockDist(pResInfo, &info); mergeTableBlockDist(pResInfo, &info);
taosArrayDestroy(info.dataBlockInfos);
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
......
此差异已折叠。
...@@ -206,6 +206,12 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR ...@@ -206,6 +206,12 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
} else { } else {
assert(pFillInfo->currentKey == ts); assert(pFillInfo->currentKey == ts);
initBeforeAfterDataBuf(pFillInfo, prev); initBeforeAfterDataBuf(pFillInfo, prev);
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
initBeforeAfterDataBuf(pFillInfo, next);
++pFillInfo->index;
copyCurrentRowIntoBuf(pFillInfo, srcData, *next);
--pFillInfo->index;
}
// assign rows to dst buffer // assign rows to dst buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
...@@ -227,6 +233,12 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR ...@@ -227,6 +233,12 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
} else if (pFillInfo->type == TSDB_FILL_LINEAR) { } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
assignVal(output, src, pCol->col.bytes, pCol->col.type); assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); memcpy(*prev + pCol->col.offset, src, pCol->col.bytes);
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
if (*next) {
assignVal(output, *next + pCol->col.offset, pCol->col.bytes, pCol->col.type);
} else {
setNull(output, pCol->col.type, pCol->col.bytes);
}
} else { } else {
assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
} }
......
...@@ -567,10 +567,18 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -567,10 +567,18 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
} }
} else if (pQueryAttr->interval.interval > 0) { } else if (pQueryAttr->interval.interval > 0) {
if (pQueryAttr->stableQuery) { if (pQueryAttr->stableQuery) {
op = OP_MultiTableTimeInterval; if (pQueryAttr->pointInterpQuery) {
op = OP_AllMultiTableTimeInterval;
} else {
op = OP_MultiTableTimeInterval;
}
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} else { } else {
op = OP_TimeWindow; if (pQueryAttr->pointInterpQuery) {
op = OP_AllTimeWindow;
} else {
op = OP_TimeWindow;
}
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) { if (pQueryAttr->pExpr2 != NULL) {
...@@ -578,7 +586,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -578,7 +586,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) { if (pQueryAttr->fillType != TSDB_FILL_NONE) {
op = OP_Fill; op = OP_Fill;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
......
...@@ -30,6 +30,18 @@ typedef struct SCompSupporter { ...@@ -30,6 +30,18 @@ typedef struct SCompSupporter {
int32_t order; int32_t order;
} SCompSupporter; } SCompSupporter;
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable) {
if (pQueryAttr && (!stable)) {
for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM) {
return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64;
}
}
}
return 1;
}
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) { int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) {
int32_t size = 0; int32_t size = 0;
......
...@@ -330,7 +330,7 @@ void intDataTest() { ...@@ -330,7 +330,7 @@ void intDataTest() {
filterAddRange(h, ra + i, TSDB_RELATION_AND); filterAddRange(h, ra + i, TSDB_RELATION_AND);
} }
filterGetRangeNum(h, &num); filterGetRangeNum(h, &num);
ASSERT_EQ(num, 0); ASSERT_EQ(num, 1);
filterFreeRangeCtx(h); filterFreeRangeCtx(h);
......
...@@ -1133,8 +1133,8 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1133,8 +1133,8 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
} else { } else {
// for asynchronous API // for asynchronous API
SRpcEpSet *pEpSet = NULL; SRpcEpSet *pEpSet = NULL;
if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) //if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect)
pEpSet = &pContext->epSet; pEpSet = &pContext->epSet;
(*pRpc->cfp)(pMsg, pEpSet); (*pRpc->cfp)(pMsg, pEpSet);
} }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册