提交 57bfff85 编写于 作者: D dapan1121

Merge branch 'feature/TD-4034' into feature/TD-3950

# Use the latest 2.1 version of CircleCI pipeline process engine. See: https://circleci.com/docs/2.0/configuration-reference
version: 2.1
# Use a package of configuration called an orb.
orbs:
# Declare a dependency on the welcome-orb
welcome: circleci/welcome-orb@0.4.1
# Orchestrate or schedule a set of jobs
workflows:
# Name the workflow "welcome"
welcome:
# Run the welcome/run job in its own container
jobs:
- welcome/run
[submodule "src/connector/go"] [submodule "src/connector/go"]
path = src/connector/go path = src/connector/go
url = https://github.com/taosdata/driver-go url = git@github.com:taosdata/driver-go.git
[submodule "src/connector/grafanaplugin"] [submodule "src/connector/grafanaplugin"]
path = src/connector/grafanaplugin path = src/connector/grafanaplugin
url = https://github.com/taosdata/grafanaplugin url = git@github.com:taosdata/grafanaplugin.git
[submodule "src/connector/hivemq-tdengine-extension"] [submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension path = src/connector/hivemq-tdengine-extension
url = https://github.com/huskar-t/hivemq-tdengine-extension.git url = git@github.com:taosdata/hivemq-tdengine-extension.git
[submodule "tests/examples/rust"] [submodule "tests/examples/rust"]
path = tests/examples/rust path = tests/examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git url = https://github.com/songtianyi/tdengine-rust-bindings.git
...@@ -116,7 +116,7 @@ mkdir debug && cd debug ...@@ -116,7 +116,7 @@ mkdir debug && cd debug
cmake .. && cmake --build . cmake .. && cmake --build .
``` ```
在X86-64、X86、arm64 和 arm32 平台上,TDengine 生成脚本可以自动检测机器架构。也可以手动配置 CPUTYPE 参数来指定 CPU 类型,如 aarch64 或 aarch32 等。 在X86-64、X86、arm64、arm32 和 mips64 平台上,TDengine 生成脚本可以自动检测机器架构。也可以手动配置 CPUTYPE 参数来指定 CPU 类型,如 aarch64 或 aarch32 等。
aarch64: aarch64:
...@@ -130,6 +130,12 @@ aarch32: ...@@ -130,6 +130,12 @@ aarch32:
cmake .. -DCPUTYPE=aarch32 && cmake --build . cmake .. -DCPUTYPE=aarch32 && cmake --build .
``` ```
mips64:
```bash
cmake .. -DCPUTYPE=mips64 && cmake --build .
```
### Windows 系统 ### Windows 系统
如果你使用的是 Visual Studio 2013 版本: 如果你使用的是 Visual Studio 2013 版本:
......
...@@ -110,7 +110,7 @@ mkdir debug && cd debug ...@@ -110,7 +110,7 @@ mkdir debug && cd debug
cmake .. && cmake --build . cmake .. && cmake --build .
``` ```
TDengine build script can detect the host machine's architecture on X86-64, X86, arm64 and arm32 platform. TDengine build script can detect the host machine's architecture on X86-64, X86, arm64, arm32 and mips64 platform.
You can also specify CPUTYPE option like aarch64 or aarch32 too if the detection result is not correct: You can also specify CPUTYPE option like aarch64 or aarch32 too if the detection result is not correct:
aarch64: aarch64:
...@@ -123,6 +123,11 @@ aarch32: ...@@ -123,6 +123,11 @@ aarch32:
cmake .. -DCPUTYPE=aarch32 && cmake --build . cmake .. -DCPUTYPE=aarch32 && cmake --build .
``` ```
mips64:
```bash
cmake .. -DCPUTYPE=mips64 && cmake --build .
```
### On Windows platform ### On Windows platform
If you use the Visual Studio 2013, please open a command window by executing "cmd.exe". If you use the Visual Studio 2013, please open a command window by executing "cmd.exe".
......
...@@ -4,7 +4,7 @@ PROJECT(TDengine) ...@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "2.1.0.0") SET(TD_VER_NUMBER "2.1.1.0")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)
......
...@@ -26,17 +26,17 @@ ...@@ -26,17 +26,17 @@
## 2. Windows平台下JDBCDriver找不到动态链接库,怎么办? ## 2. Windows平台下JDBCDriver找不到动态链接库,怎么办?
请看为此问题撰写的[技术博客](https://www.taosdata.com/blog/2019/12/03/jdbcdriver找不到动态链接库/) 请看为此问题撰写的[技术博客](https://www.taosdata.com/blog/2019/12/03/950.html)
## 3. 创建数据表时提示more dnodes are needed ## 3. 创建数据表时提示more dnodes are needed
请看为此问题撰写的[技术博客](https://www.taosdata.com/blog/2019/12/03/创建数据表时提示more-dnodes-are-needed/) 请看为此问题撰写的[技术博客](https://www.taosdata.com/blog/2019/12/03/965.html)
## 4. 如何让TDengine crash时生成core文件? ## 4. 如何让TDengine crash时生成core文件?
请看为此问题撰写的[技术博客](https://www.taosdata.com/blog/2019/12/06/tdengine-crash时生成core文件的方法/) 请看为此问题撰写的[技术博客](https://www.taosdata.com/blog/2019/12/06/974.html)
## 5. 遇到错误"Unable to establish connection", 我怎么办? ## 5. 遇到错误“Unable to establish connection”, 我怎么办?
客户端遇到连接故障,请按照下面的步骤进行检查: 客户端遇到连接故障,请按照下面的步骤进行检查:
...@@ -51,13 +51,13 @@ ...@@ -51,13 +51,13 @@
4. 确认客户端连接时指定了正确的服务器FQDN (Fully Qualified Domain Name(可在服务器上执行Linux命令hostname -f获得)),FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html) 4. 确认客户端连接时指定了正确的服务器FQDN (Fully Qualified Domain Name(可在服务器上执行Linux命令hostname -f获得)),FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)
5. ping服务器FQDN,如果没有反应,请检查你的网络,DNS设置,或客户端所在计算机的系统hosts文件 5. ping服务器FQDN,如果没有反应,请检查你的网络,DNS设置,或客户端所在计算机的系统hosts文件。如果部署的是TDengine集群,客户端需要能ping通所有集群节点的FQDN。
6. 检查防火墙设置(Ubuntu 使用 ufw status,CentOS 使用 firewall-cmd --list-port),确认TCP/UDP 端口6030-6042 是打开的 6. 检查防火墙设置(Ubuntu 使用 ufw status,CentOS 使用 firewall-cmd --list-port),确认TCP/UDP 端口6030-6042 是打开的
7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/taos/driver*里, 并且*/usr/local/taos/driver*在系统库函数搜索路径*LD_LIBRARY_PATH* 7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/taos/driver*里, 并且*/usr/local/taos/driver*在系统库函数搜索路径*LD_LIBRARY_PATH*
8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*C:\TDengine\driver\taos.dll*在你的系统库函数搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*) 8. 对于Windows上的JDBC, ODBC, Python, Go等连接,确保*C:\TDengine\driver\taos.dll*在你的系统库函数搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
9. 如果仍不能排除连接故障 9. 如果仍不能排除连接故障
...@@ -70,7 +70,8 @@ ...@@ -70,7 +70,8 @@
10. 也可以使用taos程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅(包括TCP和UDP):[TDengine 内嵌网络检测工具使用指南](https://www.taosdata.com/blog/2020/09/08/1816.html) 10. 也可以使用taos程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅(包括TCP和UDP):[TDengine 内嵌网络检测工具使用指南](https://www.taosdata.com/blog/2020/09/08/1816.html)
## 6. 遇到错误“Unexpected generic error in RPC”或者"TDengine Error: Unable to resolve FQDN", 我怎么办? ## 6. 遇到错误“Unexpected generic error in RPC”或者“Unable to resolve FQDN”,我怎么办?
产生这个错误,是由于客户端或数据节点无法解析FQDN(Fully Qualified Domain Name)导致。对于TAOS Shell或客户端应用,请做如下检查: 产生这个错误,是由于客户端或数据节点无法解析FQDN(Fully Qualified Domain Name)导致。对于TAOS Shell或客户端应用,请做如下检查:
1. 请检查连接的服务器的FQDN是否正确,FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html) 1. 请检查连接的服务器的FQDN是否正确,FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)
...@@ -102,7 +103,7 @@ TDengine 目前尚不支持删除功能,未来根据用户需求可能会支 ...@@ -102,7 +103,7 @@ TDengine 目前尚不支持删除功能,未来根据用户需求可能会支
批量插入。每条写入语句可以一张表同时插入多条记录,也可以同时插入多张表的多条记录。 批量插入。每条写入语句可以一张表同时插入多条记录,也可以同时插入多张表的多条记录。
## 12. windows系统下插入的nchar类数据中的汉字被解析成了乱码如何解决? ## 12. Windows系统下插入的nchar类数据中的汉字被解析成了乱码如何解决?
Windows下插入nchar类的数据中如果有中文,请先确认系统的地区设置成了中国(在Control Panel里可以设置),这时cmd中的`taos`客户端应该已经可以正常工作了;如果是在IDE里开发Java应用,比如Eclipse, Intellij,请确认IDE里的文件编码为GBK(这是Java默认的编码类型),然后在生成Connection时,初始化客户端的配置,具体语句如下: Windows下插入nchar类的数据中如果有中文,请先确认系统的地区设置成了中国(在Control Panel里可以设置),这时cmd中的`taos`客户端应该已经可以正常工作了;如果是在IDE里开发Java应用,比如Eclipse, Intellij,请确认IDE里的文件编码为GBK(这是Java默认的编码类型),然后在生成Connection时,初始化客户端的配置,具体语句如下:
```JAVA ```JAVA
...@@ -115,15 +116,15 @@ Connection = DriverManager.getConnection(url, properties); ...@@ -115,15 +116,15 @@ Connection = DriverManager.getConnection(url, properties);
## 13.JDBC报错: the excuted SQL is not a DML or a DDL? ## 13.JDBC报错: the excuted SQL is not a DML or a DDL?
请更新至最新的JDBC驱动 请更新至最新的JDBC驱动
```JAVA ```xml
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.4</version> <version>2.0.27</version>
</dependency> </dependency>
``` ```
## 14. taos connect failed, reason: invalid timestamp ## 14. taos connect failed, reason&#58; invalid timestamp
常见原因是服务器和客户端时间没有校准,可以通过和时间服务器同步的方式(Linux 下使用 ntpdate 命令,Windows 在系统时间设置中选择自动同步)校准。 常见原因是服务器和客户端时间没有校准,可以通过和时间服务器同步的方式(Linux 下使用 ntpdate 命令,Windows 在系统时间设置中选择自动同步)校准。
...@@ -157,7 +158,8 @@ ALTER LOCAL RESETLOG; ...@@ -157,7 +158,8 @@ ALTER LOCAL RESETLOG;
其含义是,清空本机所有由客户端生成的日志文件。 其含义是,清空本机所有由客户端生成的日志文件。
## <a class="anchor" id="timezone"></a>18. 时间戳的时区信息是怎样处理的? <a class="anchor" id="timezone"></a>
## 18. 时间戳的时区信息是怎样处理的?
TDengine 中时间戳的时区总是由客户端进行处理,而与服务端无关。具体来说,客户端会对 SQL 语句中的时间戳进行时区转换,转为 UTC 时区(即 Unix 时间戳——Unix Timestamp)再交由服务端进行写入和查询;在读取数据时,服务端也是采用 UTC 时区提供原始数据,客户端收到后再根据本地设置,把时间戳转换为本地系统所要求的时区进行显示。 TDengine 中时间戳的时区总是由客户端进行处理,而与服务端无关。具体来说,客户端会对 SQL 语句中的时间戳进行时区转换,转为 UTC 时区(即 Unix 时间戳——Unix Timestamp)再交由服务端进行写入和查询;在读取数据时,服务端也是采用 UTC 时区提供原始数据,客户端收到后再根据本地设置,把时间戳转换为本地系统所要求的时区进行显示。
...@@ -167,12 +169,13 @@ TDengine 中时间戳的时区总是由客户端进行处理,而与服务端 ...@@ -167,12 +169,13 @@ TDengine 中时间戳的时区总是由客户端进行处理,而与服务端
3. 如果在 C/C++/Java/Python 等各种编程语言的 Connector Driver 中,在建立数据库连接时显式指定了 timezone,那么会以这个指定的时区设置为准。例如 Java Connector 的 JDBC URL 中就有 timezone 参数。 3. 如果在 C/C++/Java/Python 等各种编程语言的 Connector Driver 中,在建立数据库连接时显式指定了 timezone,那么会以这个指定的时区设置为准。例如 Java Connector 的 JDBC URL 中就有 timezone 参数。
4. 在书写 SQL 语句时,也可以直接使用 Unix 时间戳(例如 `1554984068000`)或带有时区的时间戳字符串,也即以 RFC 3339 格式(例如 `2013-04-12T15:52:01.123+08:00`)或 ISO-8601 格式(例如 `2013-04-12T15:52:01.123+0800`)来书写时间戳,此时这些时间戳的取值将不再受其他时区设置的影响。 4. 在书写 SQL 语句时,也可以直接使用 Unix 时间戳(例如 `1554984068000`)或带有时区的时间戳字符串,也即以 RFC 3339 格式(例如 `2013-04-12T15:52:01.123+08:00`)或 ISO-8601 格式(例如 `2013-04-12T15:52:01.123+0800`)来书写时间戳,此时这些时间戳的取值将不再受其他时区设置的影响。
## <a class="anchor" id="port"></a>19. TDengine 都会用到哪些网络端口? <a class="anchor" id="port"></a>
## 19. TDengine 都会用到哪些网络端口?
在 TDengine 2.0 版本中,会用到以下这些网络端口(以默认端口 6030 为前提进行说明,如果修改了配置文件中的设置,那么这里列举的端口都会出现变化),管理员可以参考这里的信息调整防火墙设置: 在 TDengine 2.0 版本中,会用到以下这些网络端口(以默认端口 6030 为前提进行说明,如果修改了配置文件中的设置,那么这里列举的端口都会出现变化),管理员可以参考这里的信息调整防火墙设置:
| 协议 | 默认端口 | 用途说明 | 修改方法 | | 协议 | 默认端口 | 用途说明 | 修改方法 |
| --- | --------- | ------------------------------- | ------------------------------ | | :--- | :-------- | :---------------------------------- | :------------------------------- |
| TCP | 6030 | 客户端与服务端之间通讯。 | 由配置文件设置 serverPort 决定。 | | TCP | 6030 | 客户端与服务端之间通讯。 | 由配置文件设置 serverPort 决定。 |
| TCP | 6035 | 多节点集群的节点间通讯。 | 随 serverPort 端口变化。 | | TCP | 6035 | 多节点集群的节点间通讯。 | 随 serverPort 端口变化。 |
| TCP | 6040 | 多节点集群的节点间数据同步。 | 随 serverPort 端口变化。 | | TCP | 6040 | 多节点集群的节点间数据同步。 | 随 serverPort 端口变化。 |
......
...@@ -114,6 +114,25 @@ mkdir -p ${install_dir}/examples ...@@ -114,6 +114,25 @@ mkdir -p ${install_dir}/examples
examples_dir="${top_dir}/tests/examples" examples_dir="${top_dir}/tests/examples"
cp -r ${examples_dir}/c ${install_dir}/examples cp -r ${examples_dir}/c ${install_dir}/examples
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ -d ${examples_dir}/JDBC/connectionPools/target ]; then
rm -rf ${examples_dir}/JDBC/connectionPools/target
fi
if [ -d ${examples_dir}/JDBC/JDBCDemo/target ]; then
rm -rf ${examples_dir}/JDBC/JDBCDemo/target
fi
if [ -d ${examples_dir}/JDBC/mybatisplus-demo/target ]; then
rm -rf ${examples_dir}/JDBC/mybatisplus-demo/target
fi
if [ -d ${examples_dir}/JDBC/springbootdemo/target ]; then
rm -rf ${examples_dir}/JDBC/springbootdemo/target
fi
if [ -d ${examples_dir}/JDBC/SpringJdbcTemplate/target ]; then
rm -rf ${examples_dir}/JDBC/SpringJdbcTemplate/target
fi
if [ -d ${examples_dir}/JDBC/taosdemo/target ]; then
rm -rf ${examples_dir}/JDBC/taosdemo/target
fi
cp -r ${examples_dir}/JDBC ${install_dir}/examples cp -r ${examples_dir}/JDBC ${install_dir}/examples
cp -r ${examples_dir}/matlab ${install_dir}/examples cp -r ${examples_dir}/matlab ${install_dir}/examples
cp -r ${examples_dir}/python ${install_dir}/examples cp -r ${examples_dir}/python ${install_dir}/examples
......
name: tdengine name: tdengine
base: core18 base: core18
version: '2.1.1.0'
version: '2.1.0.0'
icon: snap/gui/t-dengine.svg icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT. summary: an open-source big data platform designed and optimized for IoT.
description: | description: |
...@@ -73,7 +72,7 @@ parts: ...@@ -73,7 +72,7 @@ parts:
- usr/bin/taosd - usr/bin/taosd
- usr/bin/taos - usr/bin/taos
- usr/bin/taosdemo - usr/bin/taosdemo
- usr/lib/libtaos.so.2.1.0.0 - usr/lib/libtaos.so.2.1.1.0
- usr/lib/libtaos.so.1 - usr/lib/libtaos.so.1
- usr/lib/libtaos.so - usr/lib/libtaos.so
......
...@@ -7187,6 +7187,11 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) { ...@@ -7187,6 +7187,11 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
const char* msg1 = "point interpolation query needs timestamp"; const char* msg1 = "point interpolation query needs timestamp";
const char* msg2 = "too many tables in from clause"; const char* msg2 = "too many tables in from clause";
const char* msg3 = "start(end) time of query range required or time range too large"; const char* msg3 = "start(end) time of query range required or time range too large";
// const char* msg5 = "too many columns in selection clause";
// const char* msg6 = "too many tables in from clause";
// const char* msg7 = "invalid table alias name";
// const char* msg8 = "alias name too long";
const char* msg9 = "only tag query not compatible with normal column filter";
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -7326,6 +7331,20 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) { ...@@ -7326,6 +7331,20 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
} }
} }
if (tscQueryTags(pQueryInfo)) {
SExprInfo* pExpr1 = tscSqlExprGet(pQueryInfo, 0);
if (pExpr1->base.functionId != TSDB_FUNC_TID_TAG) {
int32_t numOfCols = (int32_t)taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCols = taosArrayGetP(pQueryInfo->colList, i);
if (pCols->info.flist.numOfFilters > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
}
}
}
// parse the having clause in the first place // parse the having clause in the first place
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) != if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
......
...@@ -1928,8 +1928,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1928,8 +1928,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
} }
} }
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s", pSql->self, pTableMeta->id.uid, pTableMeta->id.tid, tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self,
tNameGetTableName(&pTableMetaInfo->name)); pTableMeta->id.uid, pTableMeta->id.tid, tNameGetTableName(&pTableMetaInfo->name), pTableMeta->tableInfo.numOfColumns,
pTableMeta->tableInfo.numOfTags);
free(pTableMeta); free(pTableMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2072,7 +2073,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { ...@@ -2072,7 +2073,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups; pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
if (pInfo->vgroupList->numOfVgroups <= 0) { if (pInfo->vgroupList->numOfVgroups <= 0) {
tscDebug("0x%"PRIx64" empty vgroup info, no corresponding tables for stable", pSql->self); tscDebug("0x%" PRIx64 " empty vgroup info, no corresponding tables for stable", pSql->self);
} else { } else {
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
// just init, no need to lock // just init, no need to lock
......
...@@ -627,6 +627,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -627,6 +627,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self); tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
free(pStream);
return NULL; return NULL;
} }
......
...@@ -215,7 +215,7 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { ...@@ -215,7 +215,7 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) {
taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer);
} }
//TODO refactor: extract table list name not simply from the sql
static SArray* getTableList( SSqlObj* pSql ) { static SArray* getTableList( SSqlObj* pSql ) {
const char* p = strstr( pSql->sqlstr, " from " ); const char* p = strstr( pSql->sqlstr, " from " );
assert(p != NULL); // we are sure this is a 'select' statement assert(p != NULL); // we are sure this is a 'select' statement
...@@ -224,11 +224,11 @@ static SArray* getTableList( SSqlObj* pSql ) { ...@@ -224,11 +224,11 @@ static SArray* getTableList( SSqlObj* pSql ) {
SSqlObj* pNew = taos_query(pSql->pTscObj, sql); SSqlObj* pNew = taos_query(pSql->pTscObj, sql);
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64"failed to retrieve table id: cannot create new sql object.", pSql->self); tscError("0x%"PRIx64" failed to retrieve table id: cannot create new sql object.", pSql->self);
return NULL; return NULL;
} else if (taos_errno(pNew) != TSDB_CODE_SUCCESS) { } else if (taos_errno(pNew) != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64"failed to retrieve table id,error: %s", pSql->self, tstrerror(taos_errno(pNew))); tscError("0x%"PRIx64" failed to retrieve table id,error: %s", pSql->self, tstrerror(taos_errno(pNew)));
return NULL; return NULL;
} }
......
...@@ -417,18 +417,6 @@ void setVardataNull(char* val, int32_t type) { ...@@ -417,18 +417,6 @@ void setVardataNull(char* val, int32_t type) {
} }
} }
bool isVardataNull(char* val, int32_t type) {
if (type == TSDB_DATA_TYPE_BINARY) {
return *(uint8_t*) varDataVal(val) == TSDB_DATA_BINARY_NULL;
} else if (type == TSDB_DATA_TYPE_NCHAR) {
return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL;
} else {
assert(0);
}
return false;
}
void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); } void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); }
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
...@@ -504,55 +492,6 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) { ...@@ -504,55 +492,6 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
} }
} }
bool isNullN(char *val, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return *(uint8_t *)(val) == TSDB_DATA_BOOL_NULL;
break;
case TSDB_DATA_TYPE_TINYINT:
return *(uint8_t *)(val) == TSDB_DATA_TINYINT_NULL;
break;
case TSDB_DATA_TYPE_SMALLINT:
return *(uint16_t *)(val) == TSDB_DATA_SMALLINT_NULL;
break;
case TSDB_DATA_TYPE_INT:
return *(uint32_t *)(val) == TSDB_DATA_INT_NULL;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
return *(uint64_t *)(val) == TSDB_DATA_BIGINT_NULL;
break;
case TSDB_DATA_TYPE_UTINYINT:
return *(uint8_t *)(val) == TSDB_DATA_UTINYINT_NULL;
break;
case TSDB_DATA_TYPE_USMALLINT:
return *(uint16_t *)(val) == TSDB_DATA_USMALLINT_NULL;
break;
case TSDB_DATA_TYPE_UINT:
return *(uint32_t *)(val) == TSDB_DATA_UINT_NULL;
break;
case TSDB_DATA_TYPE_UBIGINT:
return *(uint64_t *)(val) == TSDB_DATA_UBIGINT_NULL;
break;
case TSDB_DATA_TYPE_FLOAT:
return *(uint32_t *)(val) == TSDB_DATA_FLOAT_NULL;
break;
case TSDB_DATA_TYPE_DOUBLE:
return *(uint64_t *)(val) == TSDB_DATA_DOUBLE_NULL;
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
return isVardataNull(val, type);
break;
default: {
return *(uint32_t *)(val) == TSDB_DATA_INT_NULL;
break;
}
}
return false;
}
static uint8_t nullBool = TSDB_DATA_BOOL_NULL; static uint8_t nullBool = TSDB_DATA_BOOL_NULL;
static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL; static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL;
static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL; static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL;
......
Subproject commit 7a26c432f8b4203e42344ff3290b9b9b01b983d5 Subproject commit 8ce6d86558afc8c0b50c10f990fd2b4270cf06fc
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa
...@@ -312,11 +312,7 @@ static int test_sqls_in_stmt(SQLHENV env, SQLHDBC conn, SQLHSTMT stmt, const cha ...@@ -312,11 +312,7 @@ static int test_sqls_in_stmt(SQLHENV env, SQLHDBC conn, SQLHSTMT stmt, const cha
size_t len = 0; size_t len = 0;
ssize_t n = 0; ssize_t n = 0;
#ifdef _MSC_VER n = tgetline(&line, &len, f);
n = taosGetlineImp(&line, &len, f);
#else
n = getline(&line, &len, f);
#endif
if (n==-1) break; if (n==-1) break;
const char *p = NULL; const char *p = NULL;
......
...@@ -219,6 +219,7 @@ int32_t* taosGetErrno(); ...@@ -219,6 +219,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied") #define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing") #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing")
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state") #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state")
#define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0515) //"Database is closing")
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID") #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID")
......
...@@ -69,7 +69,7 @@ typedef struct { ...@@ -69,7 +69,7 @@ typedef struct {
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t update; int8_t update;
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
} STsdbCfg; } STsdbCfg;
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) #define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
......
...@@ -178,9 +178,6 @@ void setNull(char *val, int32_t type, int32_t bytes); ...@@ -178,9 +178,6 @@ void setNull(char *val, int32_t type, int32_t bytes);
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
void *getNullValue(int32_t type); void *getNullValue(int32_t type);
bool isVardataNull(char* val, int32_t type);
bool isNullN(char *val, int32_t type);
void assignVal(char *val, const char *src, int32_t len, int32_t type); void assignVal(char *val, const char *src, int32_t len, int32_t type);
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf);
......
此差异已折叠。
...@@ -719,13 +719,13 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * ...@@ -719,13 +719,13 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
if (action == SDB_ACTION_INSERT) { if (action == SDB_ACTION_INSERT) {
return sdbPerformInsertAction(pHead, pTable); return sdbPerformInsertAction(pHead, pTable);
} else if (action == SDB_ACTION_DELETE) { } else if (action == SDB_ACTION_DELETE) {
if (qtype == TAOS_QTYPE_FWD) { //if (qtype == TAOS_QTYPE_FWD) {
// Drop database/stable may take a long time and cause a timeout, so we confirm first then reput it into queue // Drop database/stable may take a long time and cause a timeout, so we confirm first then reput it into queue
sdbWriteFwdToQueue(1, hparam, TAOS_QTYPE_QUERY, unused); // sdbWriteFwdToQueue(1, hparam, TAOS_QTYPE_QUERY, unused);
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} else { //} else {
return sdbPerformDeleteAction(pHead, pTable); return sdbPerformDeleteAction(pHead, pTable);
} //}
} else if (action == SDB_ACTION_UPDATE) { } else if (action == SDB_ACTION_UPDATE) {
return sdbPerformUpdateAction(pHead, pTable); return sdbPerformUpdateAction(pHead, pTable);
} else { } else {
......
...@@ -1189,8 +1189,8 @@ static int32_t mnodeFindSuperTableTagIndex(SSTableObj *pStable, const char *tagN ...@@ -1189,8 +1189,8 @@ static int32_t mnodeFindSuperTableTagIndex(SSTableObj *pStable, const char *tagN
static int32_t mnodeAddSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeAddSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
SSTableObj *pStable = (SSTableObj *)pMsg->pTable; SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
mLInfo("msg:%p, app:%p stable %s, add tag result:%s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId, mLInfo("msg:%p, app:%p stable %s, add tag result:%s, numOfTags:%d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
tstrerror(code)); tstrerror(code), pStable->numOfTags);
return code; return code;
} }
......
...@@ -121,7 +121,7 @@ static int32_t mnodeVgroupActionDelete(SSdbRow *pRow) { ...@@ -121,7 +121,7 @@ static int32_t mnodeVgroupActionDelete(SSdbRow *pRow) {
SVgObj *pVgroup = pRow->pObj; SVgObj *pVgroup = pRow->pObj;
if (pVgroup->pDb == NULL) { if (pVgroup->pDb == NULL) {
mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName); mError("vgId:%d, db:%s is not exist while delete from hash", pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_MND_VGROUP_NOT_EXIST; return TSDB_CODE_MND_VGROUP_NOT_EXIST;
} }
......
...@@ -62,7 +62,7 @@ static void* taosRandomRealloc(void* ptr, size_t size, const char* file, uint32_ ...@@ -62,7 +62,7 @@ static void* taosRandomRealloc(void* ptr, size_t size, const char* file, uint32_
static char* taosRandomStrdup(const char* str, const char* file, uint32_t line) { static char* taosRandomStrdup(const char* str, const char* file, uint32_t line) {
size_t len = strlen(str); size_t len = strlen(str);
return taosRandomAllocFail(len + 1, file, line) ? NULL : taosStrdupImp(str); return taosRandomAllocFail(len + 1, file, line) ? NULL : tstrdup(str);
} }
static char* taosRandomStrndup(const char* str, size_t size, const char* file, uint32_t line) { static char* taosRandomStrndup(const char* str, size_t size, const char* file, uint32_t line) {
...@@ -70,11 +70,11 @@ static char* taosRandomStrndup(const char* str, size_t size, const char* file, u ...@@ -70,11 +70,11 @@ static char* taosRandomStrndup(const char* str, size_t size, const char* file, u
if (len > size) { if (len > size) {
len = size; len = size;
} }
return taosRandomAllocFail(len + 1, file, line) ? NULL : taosStrndupImp(str, len); return taosRandomAllocFail(len + 1, file, line) ? NULL : tstrndup(str, len);
} }
static ssize_t taosRandomGetline(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) { static ssize_t taosRandomGetline(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) {
return taosRandomAllocFail(*n, file, line) ? -1 : taosGetlineImp(lineptr, n, stream); return taosRandomAllocFail(*n, file, line) ? -1 : tgetline(lineptr, n, stream);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -242,7 +242,7 @@ static char* taosStrndupDetectLeak(const char* str, size_t size, const char* fil ...@@ -242,7 +242,7 @@ static char* taosStrndupDetectLeak(const char* str, size_t size, const char* fil
static ssize_t taosGetlineDetectLeak(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) { static ssize_t taosGetlineDetectLeak(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) {
char* buf = NULL; char* buf = NULL;
size_t bufSize = 0; size_t bufSize = 0;
ssize_t size = taosGetlineImp(&buf, &bufSize, stream); ssize_t size = tgetline(&buf, &bufSize, stream);
if (size != -1) { if (size != -1) {
if (*n < size + 1) { if (*n < size + 1) {
void* p = taosReallocDetectLeak(*lineptr, size + 1, file, line); void* p = taosReallocDetectLeak(*lineptr, size + 1, file, line);
...@@ -372,7 +372,7 @@ void taosFreeMem(void* ptr, const char* file, uint32_t line) { ...@@ -372,7 +372,7 @@ void taosFreeMem(void* ptr, const char* file, uint32_t line) {
char* taosStrdupMem(const char* str, const char* file, uint32_t line) { char* taosStrdupMem(const char* str, const char* file, uint32_t line) {
switch (allocMode) { switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT: case TAOS_ALLOC_MODE_DEFAULT:
return taosStrdupImp(str); return tstrdup(str);
case TAOS_ALLOC_MODE_RANDOM_FAIL: case TAOS_ALLOC_MODE_RANDOM_FAIL:
return taosRandomStrdup(str, file, line); return taosRandomStrdup(str, file, line);
...@@ -380,13 +380,13 @@ char* taosStrdupMem(const char* str, const char* file, uint32_t line) { ...@@ -380,13 +380,13 @@ char* taosStrdupMem(const char* str, const char* file, uint32_t line) {
case TAOS_ALLOC_MODE_DETECT_LEAK: case TAOS_ALLOC_MODE_DETECT_LEAK:
return taosStrdupDetectLeak(str, file, line); return taosStrdupDetectLeak(str, file, line);
} }
return taosStrdupImp(str); return tstrdup(str);
} }
char* taosStrndupMem(const char* str, size_t size, const char* file, uint32_t line) { char* taosStrndupMem(const char* str, size_t size, const char* file, uint32_t line) {
switch (allocMode) { switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT: case TAOS_ALLOC_MODE_DEFAULT:
return taosStrndupImp(str, size); return tstrndup(str, size);
case TAOS_ALLOC_MODE_RANDOM_FAIL: case TAOS_ALLOC_MODE_RANDOM_FAIL:
return taosRandomStrndup(str, size, file, line); return taosRandomStrndup(str, size, file, line);
...@@ -394,13 +394,13 @@ char* taosStrndupMem(const char* str, size_t size, const char* file, uint32_t li ...@@ -394,13 +394,13 @@ char* taosStrndupMem(const char* str, size_t size, const char* file, uint32_t li
case TAOS_ALLOC_MODE_DETECT_LEAK: case TAOS_ALLOC_MODE_DETECT_LEAK:
return taosStrndupDetectLeak(str, size, file, line); return taosStrndupDetectLeak(str, size, file, line);
} }
return taosStrndupImp(str, size); return tstrndup(str, size);
} }
ssize_t taosGetlineMem(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) { ssize_t taosGetlineMem(char **lineptr, size_t *n, FILE *stream, const char* file, uint32_t line) {
switch (allocMode) { switch (allocMode) {
case TAOS_ALLOC_MODE_DEFAULT: case TAOS_ALLOC_MODE_DEFAULT:
return taosGetlineImp(lineptr, n, stream); return tgetline(lineptr, n, stream);
case TAOS_ALLOC_MODE_RANDOM_FAIL: case TAOS_ALLOC_MODE_RANDOM_FAIL:
return taosRandomGetline(lineptr, n, stream, file, line); return taosRandomGetline(lineptr, n, stream, file, line);
...@@ -408,7 +408,7 @@ ssize_t taosGetlineMem(char **lineptr, size_t *n, FILE *stream, const char* file ...@@ -408,7 +408,7 @@ ssize_t taosGetlineMem(char **lineptr, size_t *n, FILE *stream, const char* file
case TAOS_ALLOC_MODE_DETECT_LEAK: case TAOS_ALLOC_MODE_DETECT_LEAK:
return taosGetlineDetectLeak(lineptr, n, stream, file, line); return taosGetlineDetectLeak(lineptr, n, stream, file, line);
} }
return taosGetlineImp(lineptr, n, stream); return tgetline(lineptr, n, stream);
} }
static void taosCloseAllocLog() { static void taosCloseAllocLog() {
...@@ -517,4 +517,4 @@ void* taosTZfree(void* ptr) { ...@@ -517,4 +517,4 @@ void* taosTZfree(void* ptr) {
free((void*)((char*)ptr - sizeof(size_t))); free((void*)((char*)ptr - sizeof(size_t)));
} }
return NULL; return NULL;
} }
\ No newline at end of file
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosdef.h" #include "taosdef.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -24,7 +25,7 @@ ...@@ -24,7 +25,7 @@
bool taosCheckPthreadValid(pthread_t thread) { return thread.p != NULL; } bool taosCheckPthreadValid(pthread_t thread) { return thread.p != NULL; }
void taosResetPthread(pthread_t *thread) { thread->p = 0; } void taosResetPthread(pthread_t* thread) { thread->p = 0; }
int64_t taosGetPthreadId(pthread_t thread) { int64_t taosGetPthreadId(pthread_t thread) {
#ifdef PTW32_VERSION #ifdef PTW32_VERSION
...@@ -34,27 +35,24 @@ int64_t taosGetPthreadId(pthread_t thread) { ...@@ -34,27 +35,24 @@ int64_t taosGetPthreadId(pthread_t thread) {
#endif #endif
} }
int64_t taosGetSelfPthreadId() { int64_t taosGetSelfPthreadId() { return GetCurrentThreadId(); }
return GetCurrentThreadId();
}
bool taosComparePthread(pthread_t first, pthread_t second) { bool taosComparePthread(pthread_t first, pthread_t second) { return first.p == second.p; }
return first.p == second.p;
}
int32_t taosGetPId() { int32_t taosGetPId() { return GetCurrentProcessId(); }
return GetCurrentProcessId();
}
int32_t taosGetCurrentAPPName(char *name, int32_t* len) { int32_t taosGetCurrentAPPName(char* name, int32_t* len) {
char filepath[1024] = {0}; char filepath[1024] = {0};
GetModuleFileName(NULL, filepath, MAX_PATH); GetModuleFileName(NULL, filepath, MAX_PATH);
*strrchr(filepath,'.') = '\0'; char* sub = strrchr(filepath, '.');
if (sub != NULL) {
*sub = '\0';
}
strcpy(name, filepath); strcpy(name, filepath);
if (len != NULL) { if (len != NULL) {
*len = (int32_t) strlen(filepath); *len = (int32_t)strlen(filepath);
} }
return 0; return 0;
......
...@@ -709,7 +709,7 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -709,7 +709,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
} }
static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
int32_t onlineNum = 0; int32_t onlineNum = 0, arbOnlineNum = 0;
int32_t masterIndex = -1; int32_t masterIndex = -1;
int32_t replica = pNode->replica; int32_t replica = pNode->replica;
...@@ -723,13 +723,15 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { ...@@ -723,13 +723,15 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) { if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++; onlineNum++;
++arbOnlineNum;
replica = pNode->replica + 1; replica = pNode->replica + 1;
} }
if (onlineNum <= replica * 0.5) { if (onlineNum <= replica * 0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
if (nodeRole == TAOS_SYNC_ROLE_MASTER && onlineNum == replica * 0.5 && onlineNum >= 1) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && onlineNum == replica * 0.5 && ((replica > 2 && onlineNum - arbOnlineNum > 1) || pNode->replica < 3)) {
sInfo("vgId:%d, self keep work as master, online:%d replica:%d", pNode->vgId, onlineNum, replica); sInfo("vgId:%d, self keep work as master, online:%d replica:%d", pNode->vgId, onlineNum, replica);
masterIndex = pNode->selfIndex;
} else { } else {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
...@@ -1002,6 +1004,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -1002,6 +1004,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version; // nodeVersion = pHead->version;
code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
syncConfirmForward(pNode->rid, pHead->version, code, false);
} else { } else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
code = syncSaveIntoBuffer(pPeer, pHead); code = syncSaveIntoBuffer(pPeer, pHead);
...@@ -1404,7 +1407,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1404,7 +1407,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS;
if (ABS(time - pFwdInfo->time) < 2000) break; if (ABS(time - pFwdInfo->time) < 10000) break;
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId, sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
pFwdInfo->version, time, pFwdInfo->time); pFwdInfo->version, time, pFwdInfo->time);
......
...@@ -45,9 +45,6 @@ typedef struct STable { ...@@ -45,9 +45,6 @@ typedef struct STable {
T_REF_DECLARE() T_REF_DECLARE()
} STable; } STable;
#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20
#define TSDB_LATEST_COLUMN_ARRAY_ADD_SIZE 5
typedef struct { typedef struct {
pthread_rwlock_t rwLock; pthread_rwlock_t rwLock;
......
...@@ -27,6 +27,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo); ...@@ -27,6 +27,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo);
static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo);
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh); static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh);
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx);
// Function declaration // Function declaration
int32_t tsdbCreateRepo(int repoid) { int32_t tsdbCreateRepo(int repoid) {
...@@ -270,9 +271,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { ...@@ -270,9 +271,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
pthread_mutex_unlock(&repo->save_mutex); pthread_mutex_unlock(&repo->save_mutex);
// schedule a commit msg then the new config will be applied immediatly // schedule a commit msg then the new config will be applied immediatly
if (tsdbLockRepo(repo) < 0) return -1; tsdbAsyncCommit(repo);
tsdbScheduleCommit(repo);
if (tsdbUnlockRepo(repo) < 0) return -1;
return 0; return 0;
#if 0 #if 0
...@@ -645,6 +644,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -645,6 +644,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
numColumns = schemaNCols(pSchema); numColumns = schemaNCols(pSchema);
if (numColumns <= pTable->restoreColumnNum) { if (numColumns <= pTable->restoreColumnNum) {
pTable->hasRestoreLastColumn = true;
return 0; return 0;
} }
if (pTable->lastColSVersion != schemaVersion(pSchema)) { if (pTable->lastColSVersion != schemaVersion(pSchema)) {
...@@ -719,7 +719,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -719,7 +719,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
//SDataCol *pDataCol = readh.pDCols[0]->cols + j; //SDataCol *pDataCol = readh.pDCols[0]->cols + j;
void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
if (isNullN(value, pCol->type)) { if (isNull(value, pCol->type)) {
continue; continue;
} }
...@@ -753,16 +753,51 @@ out: ...@@ -753,16 +753,51 @@ out:
taosTZfree(row); taosTZfree(row);
tfree(pBlockStatis); tfree(pBlockStatis);
if (err == 0 && numColumns <= pTable->restoreColumnNum) {
pTable->hasRestoreLastColumn = true;
}
return err; return err;
} }
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) {
ASSERT(pTable->lastRow == NULL);
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
return -1;
}
SBlock* pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1;
if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) {
return -1;
}
// Get the data in row
STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tdInitDataRow(pTable->lastRow, pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
pCol->offset);
}
return 0;
}
int tsdbRestoreInfo(STsdbRepo *pRepo) { int tsdbRestoreInfo(STsdbRepo *pRepo) {
SFSIter fsiter; SFSIter fsiter;
SReadH readh; SReadH readh;
SDFileSet *pSet; SDFileSet *pSet;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlock * pBlock;
if (tsdbInitReadH(&readh, pRepo) < 0) { if (tsdbInitReadH(&readh, pRepo) < 0) {
return -1; return -1;
...@@ -805,41 +840,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -805,41 +840,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
if (pIdx && lastKey < pIdx->maxKey) { if (pIdx && lastKey < pIdx->maxKey) {
pTable->lastKey = pIdx->maxKey; pTable->lastKey = pIdx->maxKey;
if (CACHE_LAST_ROW(pCfg)) { if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
if (tsdbLoadBlockInfo(&readh, NULL) < 0) { tsdbDestroyReadH(&readh);
tsdbDestroyReadH(&readh); return -1;
return -1;
}
pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1;
if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
// Get the data in row
ASSERT(pTable->lastRow == NULL);
STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyReadH(&readh);
return -1;
}
tdInitDataRow(pTable->lastRow, pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = readh.pDCols[0]->cols + icol;
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
pCol->offset);
}
} }
} }
// restore NULL columns // restore NULL columns
if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) { if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg) && !pTable->hasRestoreLastColumn) {
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
return -1; return -1;
...@@ -865,8 +873,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { ...@@ -865,8 +873,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
SReadH readh; SReadH readh;
SDFileSet *pSet; SDFileSet *pSet;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
//STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlock * pBlock;
int tableNum = 0; int tableNum = 0;
int maxTableIdx = 0; int maxTableIdx = 0;
int cacheLastRowTableNum = 0; int cacheLastRowTableNum = 0;
...@@ -955,35 +961,10 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { ...@@ -955,35 +961,10 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) {
pTable->lastKey = pIdx->maxKey; pTable->lastKey = pIdx->maxKey;
if (tsdbLoadBlockInfo(&readh, NULL) < 0) { if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1;
if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
// Get the data in row
ASSERT(pTable->lastRow == NULL);
STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
return -1; return -1;
} }
tdInitDataRow(pTable->lastRow, pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = readh.pDCols[0]->cols + icol;
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
pCol->offset);
}
cacheLastRowTableNum -= 1; cacheLastRowTableNum -= 1;
} }
......
...@@ -274,7 +274,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { ...@@ -274,7 +274,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
int tsdbAsyncCommit(STsdbRepo *pRepo) { int tsdbAsyncCommit(STsdbRepo *pRepo) {
tsem_wait(&(pRepo->readyToCommit)); tsem_wait(&(pRepo->readyToCommit));
ASSERT(pRepo->imem == NULL); //ASSERT(pRepo->imem == NULL);
if (pRepo->mem == NULL) { if (pRepo->mem == NULL) {
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
return 0; return 0;
...@@ -965,7 +965,7 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { ...@@ -965,7 +965,7 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
} }
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
tsdbInfo("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row)); tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row));
STSchema* pSchema = tsdbGetTableLatestSchema(pTable); STSchema* pSchema = tsdbGetTableLatestSchema(pTable);
if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) {
...@@ -988,7 +988,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r ...@@ -988,7 +988,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r
} }
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (isNullN(value, pTCol->type)) { if (isNull(value, pTCol->type)) {
continue; continue;
} }
......
...@@ -613,7 +613,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { ...@@ -613,7 +613,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
// todo memory leak if there are object with refcount greater than 0 in hash table? // todo memory leak if there are object with refcount greater than 0 in hash table?
taosHashCleanup(pCacheObj->pHashTable); taosHashCleanup(pCacheObj->pHashTable);
taosTrashcanEmpty(pCacheObj, true); taosTrashcanEmpty(pCacheObj, false);
__cache_lock_destroy(pCacheObj); __cache_lock_destroy(pCacheObj);
......
...@@ -454,7 +454,11 @@ void vnodeDestroy(SVnodeObj *pVnode) { ...@@ -454,7 +454,11 @@ void vnodeDestroy(SVnodeObj *pVnode) {
} }
if (pVnode->tsdb) { if (pVnode->tsdb) {
code = tsdbCloseRepo(pVnode->tsdb, 1); // the deleted vnode does not need to commit, so as to speed up the deletion
int toCommit = 1;
if (pVnode->dropped) toCommit = 0;
code = tsdbCloseRepo(pVnode->tsdb, toCommit);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
} }
......
...@@ -126,11 +126,16 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion) { ...@@ -126,11 +126,16 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion) {
} }
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) { void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
void *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
vError("vgId:%d, vnode not found while confirm forward", vgId); vError("vgId:%d, vnode not found while confirm forward", vgId);
} }
if (code == TSDB_CODE_SYN_CONFIRM_EXPIRED && pVnode->status == TAOS_VN_STATUS_CLOSING) {
vDebug("vgId:%d, db:%s, vnode is closing while confirm forward", vgId, pVnode->db);
code = TSDB_CODE_VND_IS_CLOSING;
}
dnodeSendRpcVWriteRsp(pVnode, wparam, code); dnodeSendRpcVWriteRsp(pVnode, wparam, code);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
......
...@@ -37,7 +37,7 @@ pipeline { ...@@ -37,7 +37,7 @@ pipeline {
stage('Parallel test stage') { stage('Parallel test stage') {
parallel { parallel {
stage('pytest') { stage('pytest') {
agent{label '184'} agent{label 'slad1'}
steps { steps {
pre_test() pre_test()
sh ''' sh '''
...@@ -62,7 +62,7 @@ pipeline { ...@@ -62,7 +62,7 @@ pipeline {
} }
stage('test_crash_gen') { stage('test_crash_gen') {
agent{label "185"} agent{label "slad2"}
steps { steps {
pre_test() pre_test()
sh ''' sh '''
...@@ -149,7 +149,7 @@ pipeline { ...@@ -149,7 +149,7 @@ pipeline {
} }
stage('test_valgrind') { stage('test_valgrind') {
agent{label "186"} agent{label "slad3"}
steps { steps {
pre_test() pre_test()
......
def pre_test(){
sh '''
sudo rmtaos||echo 'no taosd installed'
'''
sh '''
cd ${WKC}
git reset --hard
git checkout $BRANCH_NAME
git pull
git submodule update
cd ${WK}
git reset --hard
git checkout $BRANCH_NAME
git pull
export TZ=Asia/Harbin
date
rm -rf ${WK}/debug
mkdir debug
cd debug
cmake .. > /dev/null
make > /dev/null
make install > /dev/null
pip3 install ${WKC}/src/connector/python/linux/python3/
'''
return 1
}
pipeline {
agent none
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community'
}
stages {
stage('Parallel test stage') {
parallel {
stage('pytest') {
agent{label 'slam1'}
steps {
pre_test()
sh '''
cd ${WKC}/tests
find pytest -name '*'sql|xargs rm -rf
./test-all.sh pytest
date'''
}
}
stage('test_b1') {
agent{label 'slam2'}
steps {
pre_test()
sh '''
cd ${WKC}/tests
./test-all.sh b1
date'''
}
}
stage('test_crash_gen') {
agent{label "slam3"}
steps {
pre_test()
sh '''
cd ${WKC}/tests/pytest
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
./handle_crash_gen_val_log.sh
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
./handle_taosd_val_log.sh
'''
}
sh'''
systemctl start taosd
sleep 10
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/gotest
bash batchtest.sh
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single -DskipTests >/dev/null
java -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/src/connector/jdbc
mvn clean package -Dmaven.test.skip=true >/dev/null
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
java --class-path=../../../../src/connector/jdbc/target:$JAVA_HOME/jre/lib/ext -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cp -rf ${WKC}/tests/examples/nodejs ${JENKINS_HOME}/workspace/
cd ${JENKINS_HOME}/workspace/nodejs
node nodejsChecker.js host=localhost
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${JENKINS_HOME}/workspace/C#NET/src/CheckC#
dotnet run
'''
}
sh '''
systemctl stop taosd
cd ${WKC}/tests
./test-all.sh b2
date
'''
sh '''
cd ${WKC}/tests
./test-all.sh full unit
date'''
}
}
stage('test_valgrind') {
agent{label "slam4"}
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
nohup taosd >/dev/null &
sleep 10
python3 concurrent_inquiry.py -c 1
'''
}
sh '''
cd ${WKC}/tests
./test-all.sh full jdbc
date'''
sh '''
cd ${WKC}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log
./handle_val_log.sh
date
cd ${WKC}/tests
./test-all.sh b3
date'''
sh '''
date
cd ${WKC}/tests
./test-all.sh full example
date'''
}
}
stage('arm64_build'){
agent{label 'arm64'}
steps{
sh '''
cd ${WK}
git fetch
git checkout develop
git pull
cd ${WKC}
git fetch
git checkout develop
git pull
git submodule update
cd ${WKC}/packaging
./release.sh -v cluster -c aarch64 -n 2.0.0.0 -m 2.0.0.0
'''
}
}
stage('arm32_build'){
agent{label 'arm32'}
steps{
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WK}
git fetch
git checkout develop
git pull
cd ${WKC}
git fetch
git checkout develop
git pull
git submodule update
cd ${WKC}/packaging
./release.sh -v cluster -c aarch32 -n 2.0.0.0 -m 2.0.0.0
'''
}
}
}
}
}
}
post {
success {
emailext (
subject: "SUCCESSFUL: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
body: '''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${PROJECT_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${CAUSE}</li>
<li>变更概要:${CHANGES}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
<li>变更集:${JELLY_SCRIPT}</li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>''',
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
failure {
emailext (
subject: "FAILED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
body: '''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${PROJECT_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${CAUSE}</li>
<li>变更概要:${CHANGES}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
<li>变更集:${JELLY_SCRIPT}</li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>''',
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
}
}
\ No newline at end of file
...@@ -64,18 +64,25 @@ function runQueryPerfTest { ...@@ -64,18 +64,25 @@ function runQueryPerfTest {
[ -f $PERFORMANCE_TEST_REPORT ] && rm $PERFORMANCE_TEST_REPORT [ -f $PERFORMANCE_TEST_REPORT ] && rm $PERFORMANCE_TEST_REPORT
nohup $WORK_DIR/TDengine/debug/build/bin/taosd -c /etc/taosperf/ > /dev/null 2>&1 & nohup $WORK_DIR/TDengine/debug/build/bin/taosd -c /etc/taosperf/ > /dev/null 2>&1 &
echoInfo "Wait TDengine to start" echoInfo "Wait TDengine to start"
sleep 300 sleep 60
echoInfo "Run Performance Test" echoInfo "Run Performance Test"
cd $WORK_DIR/TDengine/tests/pytest cd $WORK_DIR/TDengine/tests/pytest
python3 query/queryPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT python3 query/queryPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
mkdir -p /var/lib/perf/
mkdir -p /var/log/perf/
rm -rf /var/lib/perf/*
rm -rf /var/log/perf/*
nohup $WORK_DIR/TDengine/debug/build/bin/taosd -c /etc/perf/ > /dev/null 2>&1 &
echoInfo "Wait TDengine to start"
sleep 10
echoInfo "Run Performance Test"
cd $WORK_DIR/TDengine/tests/pytest
python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
#python3 perfbenchmark/joinPerformance.py | tee -a $PERFORMANCE_TEST_REPORT
} }
......
...@@ -22,7 +22,7 @@ from queue import Queue, Empty ...@@ -22,7 +22,7 @@ from queue import Queue, Empty
from .shared.config import Config from .shared.config import Config
from .shared.db import DbTarget, DbConn from .shared.db import DbTarget, DbConn
from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice
from .shared.types import DirPath from .shared.types import DirPath, IpcStream
# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status # from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
# from crash_gen.db import DbConn, DbTarget # from crash_gen.db import DbConn, DbTarget
...@@ -177,13 +177,12 @@ quorum 2 ...@@ -177,13 +177,12 @@ quorum 2
return "127.0.0.1" return "127.0.0.1"
def getServiceCmdLine(self): # to start the instance def getServiceCmdLine(self): # to start the instance
cmdLine = []
if Config.getConfig().track_memory_leaks: if Config.getConfig().track_memory_leaks:
Logging.info("Invoking VALGRIND on service...") Logging.info("Invoking VALGRIND on service...")
cmdLine = ['valgrind', '--leak-check=yes'] return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()]
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control else:
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
return cmdLine return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
def _getDnodes(self, dbc): def _getDnodes(self, dbc):
dbc.query("show dnodes") dbc.query("show dnodes")
...@@ -281,16 +280,16 @@ class TdeSubProcess: ...@@ -281,16 +280,16 @@ class TdeSubProcess:
return '[TdeSubProc: pid = {}, status = {}]'.format( return '[TdeSubProc: pid = {}, status = {}]'.format(
self.getPid(), self.getStatus() ) self.getPid(), self.getStatus() )
def getStdOut(self) -> BinaryIO : def getIpcStdOut(self) -> IpcStream :
if self._popen.universal_newlines : # alias of text_mode if self._popen.universal_newlines : # alias of text_mode
raise CrashGenError("We need binary mode for STDOUT IPC") raise CrashGenError("We need binary mode for STDOUT IPC")
# Logging.info("Type of stdout is: {}".format(type(self._popen.stdout))) # Logging.info("Type of stdout is: {}".format(type(self._popen.stdout)))
return typing.cast(BinaryIO, self._popen.stdout) return typing.cast(IpcStream, self._popen.stdout)
def getStdErr(self) -> BinaryIO : def getIpcStdErr(self) -> IpcStream :
if self._popen.universal_newlines : # alias of text_mode if self._popen.universal_newlines : # alias of text_mode
raise CrashGenError("We need binary mode for STDERR IPC") raise CrashGenError("We need binary mode for STDERR IPC")
return typing.cast(BinaryIO, self._popen.stderr) return typing.cast(IpcStream, self._popen.stderr)
# Now it's always running, since we matched the life cycle # Now it's always running, since we matched the life cycle
# def isRunning(self): # def isRunning(self):
...@@ -301,11 +300,6 @@ class TdeSubProcess: ...@@ -301,11 +300,6 @@ class TdeSubProcess:
def _start(self, cmdLine) -> Popen : def _start(self, cmdLine) -> Popen :
ON_POSIX = 'posix' in sys.builtin_module_names ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check
# if self.subProcess: # already there
# raise RuntimeError("Corrupt process state")
# Prepare environment variables for coverage information # Prepare environment variables for coverage information
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment # Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
...@@ -314,9 +308,8 @@ class TdeSubProcess: ...@@ -314,9 +308,8 @@ class TdeSubProcess:
# print(myEnv) # print(myEnv)
# print("Starting TDengine with env: ", myEnv.items()) # print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr)) print("Starting TDengine: {}".format(cmdLine))
# useShell = True # Needed to pass environments into it
return Popen( return Popen(
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
shell=True, # Always use shell, since we need to pass ENV vars shell=True, # Always use shell, since we need to pass ENV vars
...@@ -732,19 +725,19 @@ class ServiceManagerThread: ...@@ -732,19 +725,19 @@ class ServiceManagerThread:
self._ipcQueue = Queue() # type: Queue self._ipcQueue = Queue() # type: Queue
self._thread = threading.Thread( # First thread captures server OUTPUT self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader, target=self.svcOutputReader,
args=(subProc.getStdOut(), self._ipcQueue, logDir)) args=(subProc.getIpcStdOut(), self._ipcQueue, logDir))
self._thread.daemon = True # thread dies with the program self._thread.daemon = True # thread dies with the program
self._thread.start() self._thread.start()
time.sleep(0.01) time.sleep(0.01)
if not self._thread.is_alive(): # What happened? if not self._thread.is_alive(): # What happened?
Logging.info("Failed to started process to monitor STDOUT") Logging.info("Failed to start process to monitor STDOUT")
self.stop() self.stop()
raise CrashGenError("Failed to start thread to monitor STDOUT") raise CrashGenError("Failed to start thread to monitor STDOUT")
Logging.info("Successfully started process to monitor STDOUT") Logging.info("Successfully started process to monitor STDOUT")
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
target=self.svcErrorReader, target=self.svcErrorReader,
args=(subProc.getStdErr(), self._ipcQueue, logDir)) args=(subProc.getIpcStdErr(), self._ipcQueue, logDir))
self._thread2.daemon = True # thread dies with the program self._thread2.daemon = True # thread dies with the program
self._thread2.start() self._thread2.start()
time.sleep(0.01) time.sleep(0.01)
...@@ -887,14 +880,19 @@ class ServiceManagerThread: ...@@ -887,14 +880,19 @@ class ServiceManagerThread:
print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437'))) print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437')))
return None return None
def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str def _textChunkGenerator(self, streamIn: IpcStream, logDir: str, logFile: str
) -> Generator[TextChunk, None, None]: ) -> Generator[TextChunk, None, None]:
''' '''
Take an input stream with binary data, produced a generator of decoded Take an input stream with binary data (likely from Popen), produced a generator of decoded
"text chunks", and also save the original binary data in a log file. "text chunks".
Side effect: it also save the original binary data in a log file.
''' '''
os.makedirs(logDir, exist_ok=True) os.makedirs(logDir, exist_ok=True)
logF = open(os.path.join(logDir, logFile), 'wb') logF = open(os.path.join(logDir, logFile), 'wb')
if logF is None:
Logging.error("Failed to open log file (binary write): {}/{}".format(logDir, logFile))
return
for bChunk in iter(streamIn.readline, b''): for bChunk in iter(streamIn.readline, b''):
logF.write(bChunk) # Write to log file immediately logF.write(bChunk) # Write to log file immediately
tChunk = self._decodeBinaryChunk(bChunk) # decode tChunk = self._decodeBinaryChunk(bChunk) # decode
...@@ -902,14 +900,14 @@ class ServiceManagerThread: ...@@ -902,14 +900,14 @@ class ServiceManagerThread:
yield tChunk # TODO: split into actual text lines yield tChunk # TODO: split into actual text lines
# At the end... # At the end...
streamIn.close() # Close the stream streamIn.close() # Close the incoming stream
logF.close() # Close the output file logF.close() # Close the log file
def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str): def svcOutputReader(self, ipcStdOut: IpcStream, queue, logDir: str):
''' '''
The infinite routine that processes the STDOUT stream for the sub process being managed. The infinite routine that processes the STDOUT stream for the sub process being managed.
:param stdOut: the IO stream object used to fetch the data from :param ipcStdOut: the IO stream object used to fetch the data from
:param queue: the queue where we dump the roughly parsed chunk-by-chunk text data :param queue: the queue where we dump the roughly parsed chunk-by-chunk text data
:param logDir: where we should dump a verbatim output file :param logDir: where we should dump a verbatim output file
''' '''
...@@ -917,7 +915,7 @@ class ServiceManagerThread: ...@@ -917,7 +915,7 @@ class ServiceManagerThread:
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...") # print("This is the svcOutput Reader...")
# stdOut.readline() # Skip the first output? TODO: remove? # stdOut.readline() # Skip the first output? TODO: remove?
for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') : for tChunk in self._textChunkGenerator(ipcStdOut, logDir, 'stdout.log') :
queue.put(tChunk) # tChunk garanteed not to be None queue.put(tChunk) # tChunk garanteed not to be None
self._printProgress("_i") self._printProgress("_i")
...@@ -940,12 +938,12 @@ class ServiceManagerThread: ...@@ -940,12 +938,12 @@ class ServiceManagerThread:
Logging.info("EOF found TDengine STDOUT, marking the process as terminated") Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
self.setStatus(Status.STATUS_STOPPED) self.setStatus(Status.STATUS_STOPPED)
def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str): def svcErrorReader(self, ipcStdErr: IpcStream, queue, logDir: str):
# os.makedirs(logDir, exist_ok=True) # os.makedirs(logDir, exist_ok=True)
# logFile = os.path.join(logDir,'stderr.log') # logFile = os.path.join(logDir,'stderr.log')
# fErr = open(logFile, 'wb') # fErr = open(logFile, 'wb')
# for line in iter(err.readline, b''): # for line in iter(err.readline, b''):
for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') : for tChunk in self._textChunkGenerator(ipcStdErr, logDir, 'stderr.log') :
queue.put(tChunk) # tChunk garanteed not to be None queue.put(tChunk) # tChunk garanteed not to be None
# fErr.write(line) # fErr.write(line)
Logging.info("TDengine STDERR: {}".format(tChunk)) Logging.info("TDengine STDERR: {}".format(tChunk))
......
from typing import Any, List, Dict, NewType from typing import Any, BinaryIO, List, Dict, NewType
from enum import Enum from enum import Enum
DirPath = NewType('DirPath', str) DirPath = NewType('DirPath', str)
...@@ -26,3 +26,5 @@ class TdDataType(Enum): ...@@ -26,3 +26,5 @@ class TdDataType(Enum):
TdColumns = Dict[str, TdDataType] TdColumns = Dict[str, TdDataType]
TdTags = Dict[str, TdDataType] TdTags = Dict[str, TdDataType]
IpcStream = NewType('IpcStream', BinaryIO)
\ No newline at end of file
...@@ -183,7 +183,7 @@ python3 ./test.py -f stable/query_after_reset.py ...@@ -183,7 +183,7 @@ python3 ./test.py -f stable/query_after_reset.py
# perfbenchmark # perfbenchmark
python3 ./test.py -f perfbenchmark/bug3433.py python3 ./test.py -f perfbenchmark/bug3433.py
#python3 ./test.py -f perfbenchmark/bug3589.py #python3 ./test.py -f perfbenchmark/bug3589.py
python3 ./test.py -f perfbenchmark/taosdemoInsert.py
#query #query
python3 ./test.py -f query/filter.py python3 ./test.py -f query/filter.py
......
...@@ -31,7 +31,7 @@ class insertFromCSVPerformace: ...@@ -31,7 +31,7 @@ class insertFromCSVPerformace:
self.host = "127.0.0.1" self.host = "127.0.0.1"
self.user = "root" self.user = "root"
self.password = "taosdata" self.password = "taosdata"
self.config = "/etc/taosperf" self.config = "/etc/perf"
self.conn = taos.connect( self.conn = taos.connect(
self.host, self.host,
self.user, self.user,
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import taos
import sys
import os
import json
import argparse
import subprocess
import datetime
import re
from multiprocessing import cpu_count
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.dnodes import TDDnode
class Taosdemo:
def __init__(self, clearCache, dbName, keep):
self.clearCache = clearCache
self.dbname = dbName
self.drop = "yes"
self.keep = keep
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
# self.config = "/etc/taosperf"
# self.conn = taos.connect(
# self.host,
# self.user,
# self.password,
# self.config)
# env config
def getBuildPath(self) -> str:
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/debug/build/bin")]
break
return buildPath
def getExeToolsDir(self) -> str:
self.debugdir = self.getBuildPath() + "/debug/build/bin"
return self.debugdir
def getCfgDir(self) -> str:
self.config = self.getBuildPath() + "/sim/dnode1/cfg"
return self.config
# taodemo insert file config
def dbinfocfg(self) -> dict:
return {
"name": self.dbname,
"drop": self.drop,
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": self.keep,
"minRows": 100,
"maxRows": 4096,
"comp": 2,
"walLevel": 1,
"cachelast": 0,
"quorum": 1,
"fsync": 3000,
"update": 0
}
def type_check(func):
def wrapper(self, **kwargs):
num_types = ["int", "float", "bigint", "tinyint", "smallint", "double"]
str_types = ["binary", "nchar"]
for k ,v in kwargs.items():
if k.lower() not in num_types and k.lower() not in str_types:
return f"args {k} type error, not allowed"
elif not isinstance(v, (int, list, tuple)):
return f"value {v} type error, not allowed"
elif k.lower() in num_types and not isinstance(v, int):
return f"arg {v} takes 1 positional argument must be type int "
elif isinstance(v, (list,tuple)) and len(v) > 2:
return f"arg {v} takes from 1 to 2 positional arguments but more than 2 were given "
elif isinstance(v,(list,tuple)) and [ False for _ in v if not isinstance(_, int) ]:
return f"arg {v} takes from 1 to 2 positional arguments must be type int "
else:
pass
return func(self, **kwargs)
return wrapper
@type_check
def column_tag_count(self, **column_tag) -> list :
init_column_tag = []
for k, v in column_tag.items():
if re.search(k, "int, float, bigint, tinyint, smallint, double", re.IGNORECASE):
init_column_tag.append({"type": k, "count": v})
elif re.search(k, "binary, nchar", re.IGNORECASE):
if isinstance(v, int):
init_column_tag.append({"type": k, "count": v, "len":8})
elif len(v) == 1:
init_column_tag.append({"type": k, "count": v[0], "len": 8})
else:
init_column_tag.append({"type": k, "count": v[0], "len": v[1]})
return init_column_tag
def stbcfg(self, stb: str, child_tab_count: int, rows: int, prechildtab: str, columns: dict, tags: dict) -> dict:
return {
"name": stb,
"child_table_exists": "no",
"childtable_count": child_tab_count,
"childtable_prefix": prechildtab,
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": rows,
"childtable_limit": 0,
"childtable_offset": 0,
"rows_per_tbl": 1,
"max_sql_len": 65480,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 10,
"start_timestamp": f"{datetime.datetime.now():%F %X}",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": self.column_tag_count(**columns),
"tags": self.column_tag_count(**tags)
}
def schemecfg(self,intcount=1,floatcount=0,bcount=0,tcount=0,scount=0,doublecount=0,binarycount=0,ncharcount=0):
return {
"INT": intcount,
"FLOAT": floatcount,
"BIGINT": bcount,
"TINYINT": tcount,
"SMALLINT": scount,
"DOUBLE": doublecount,
"BINARY": binarycount,
"NCHAR": ncharcount
}
def insertcfg(self,db: dict, stbs: list) -> dict:
return {
"filetype": "insert",
"cfgdir": self.config,
"host": self.host,
"port": 6030,
"user": self.user,
"password": self.password,
"thread_count": cpu_count(),
"thread_count_create_tbl": cpu_count(),
"result_file": "/tmp/insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": db,
"super_tables": stbs
}]
}
def createinsertfile(self,db: dict, stbs: list) -> str:
date = datetime.datetime.now()
file_create_table = f"/tmp/insert_{date:%F-%H%M}.json"
with open(file_create_table, 'w') as f:
json.dump(self.insertcfg(db, stbs), f)
return file_create_table
# taosdemo query file config
def querysqls(self, sql: str) -> list:
return [{"sql":sql,"result":""}]
def querycfg(self, sql: str) -> dict:
return {
"filetype": "query",
"cfgdir": self.config,
"host": self.host,
"port": 6030,
"user": self.user,
"password": self.password,
"confirm_parameter_prompt": "yes",
"query_times": 10,
"query_mode": "taosc",
"databases": self.dbname,
"specified_table_query": {
"query_interval": 0,
"concurrent": cpu_count(),
"sqls": self.querysqls(sql)
}
}
def createqueryfile(self, sql: str):
date = datetime.datetime.now()
file_query_table = f"/tmp/query_{date:%F-%H%M}.json"
with open(file_query_table,"w") as f:
json.dump(self.querycfg(sql), f)
return file_query_table
# Execute taosdemo, and delete temporary files when finished
def taosdemotable(self, filepath: str, resultfile="/dev/null"):
taosdemopath = self.getBuildPath() + "/debug/build/bin"
with open(filepath,"r") as f:
filetype = json.load(f)["filetype"]
if filetype == "insert":
taosdemo_table_cmd = f"{taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1"
else:
taosdemo_table_cmd = f"yes | {taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1"
try:
_ = subprocess.check_output(taosdemo_table_cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
def droptmpfile(self, filepath: str):
drop_file_cmd = f"[ -f {filepath} ] && rm -f {filepath}"
try:
_ = subprocess.check_output(drop_file_cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
# TODO:需要完成TD-4153的数据插入和客户端请求的性能查询。
def td4153insert(self):
tdLog.printNoPrefix("========== start to create table and insert data ==========")
self.dbname = "td4153"
db = self.dbinfocfg()
stblist = []
columntype = self.schemecfg(intcount=1, ncharcount=100)
tagtype = self.schemecfg(intcount=1)
stbname = "stb1"
prechild = "t1"
stable = self.stbcfg(
stb=stbname,
prechildtab=prechild,
child_tab_count=2,
rows=10000,
columns=columntype,
tags=tagtype
)
stblist.append(stable)
insertfile = self.createinsertfile(db=db, stbs=stblist)
nmon_file = f"/tmp/insert_{datetime.datetime.now():%F-%H%M}.nmon"
cmd = f"nmon -s5 -F {nmon_file} -m /tmp/"
try:
_ = subprocess.check_output(cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
self.taosdemotable(insertfile)
self.droptmpfile(insertfile)
self.droptmpfile("/tmp/insert_res.txt")
# In order to prevent too many performance files from being generated, the nmon file is deleted.
# and the delete statement can be cancelled during the actual test.
self.droptmpfile(nmon_file)
cmd = f"ps -ef|grep -w nmon| grep -v grep | awk '{{print $2}}'"
try:
time.sleep(10)
_ = subprocess.check_output(cmd,shell=True).decode("utf-8")
except BaseException as e:
raise e
def td4153query(self):
tdLog.printNoPrefix("========== start to query operation ==========")
sqls = {
"select_all": "select * from stb1",
"select_join": "select * from t10, t11 where t10.ts=t11.ts"
}
for type, sql in sqls.items():
result_file = f"/tmp/queryResult_{type}.log"
query_file = self.createqueryfile(sql)
try:
self.taosdemotable(query_file, resultfile=result_file)
except subprocess.CalledProcessError as e:
out_put = e.output
if result_file:
print(f"execute rows {type.split('_')[1]} sql, the sql is: {sql}")
max_sql_time_cmd = f'''
grep -o Spent.*s {result_file} |awk 'NR==1{{max=$2;next}}{{max=max>$2?max:$2}}END{{print "Max=",max,"s"}}'
'''
max_sql_time = subprocess.check_output(max_sql_time_cmd, shell=True).decode("UTF-8")
print(f"{type.split('_')[1]} rows sql time : {max_sql_time}")
min_sql_time_cmd = f'''
grep -o Spent.*s {result_file} |awk 'NR==1{{min=$2;next}}{{min=min<$2?min:$2}}END{{print "Min=",min,"s"}}'
'''
min_sql_time = subprocess.check_output(min_sql_time_cmd, shell=True).decode("UTF-8")
print(f"{type.split('_')[1]} rows sql time : {min_sql_time}")
avg_sql_time_cmd = f'''
grep -o Spent.*s {result_file} |awk '{{sum+=$2}}END{{print "Average=",sum/NR,"s"}}'
'''
avg_sql_time = subprocess.check_output(avg_sql_time_cmd, shell=True).decode("UTF-8")
print(f"{type.split('_')[1]} rows sql time : {avg_sql_time}")
self.droptmpfile(query_file)
self.droptmpfile(result_file)
drop_query_tmt_file_cmd = " find ./ -name 'querySystemInfo-*' -type f -exec rm {} \; "
try:
_ = subprocess.check_output(drop_query_tmt_file_cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
pass
def td4153(self):
self.td4153insert()
self.td4153query()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'-r',
'--remove-cache',
action='store_true',
default=False,
help='clear cache before query (default: False)')
parser.add_argument(
'-d',
'--database-name',
action='store',
default='db',
type=str,
help='Database name to be created (default: db)')
parser.add_argument(
'-k',
'--keep-time',
action='store',
default=3650,
type=int,
help='Database keep parameters (default: 3650)')
args = parser.parse_args()
taosdemo = Taosdemo(args.remove_cache, args.database_name, args.keep_time)
# taosdemo.conn = taos.connect(
# taosdemo.host,
# taosdemo.user,
# taosdemo.password,
# taosdemo.config
# )
debugdir = taosdemo.getExeToolsDir()
cfgdir = taosdemo.getCfgDir()
cmd = f"{debugdir}/taosd -c {cfgdir} >/dev/null 2>&1 &"
try:
_ = subprocess.check_output(cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
if taosdemo.clearCache:
# must be root permission
subprocess.check_output("echo 3 > /proc/sys/vm/drop_caches", shell=True).decode("utf-8")
taosdemo.td4153()
...@@ -24,7 +24,7 @@ class taosdemoPerformace: ...@@ -24,7 +24,7 @@ class taosdemoPerformace:
self.host = "127.0.0.1" self.host = "127.0.0.1"
self.user = "root" self.user = "root"
self.password = "taosdata" self.password = "taosdata"
self.config = "/etc/taosperf" self.config = "/etc/perf"
self.conn = taos.connect( self.conn = taos.connect(
self.host, self.host,
self.user, self.user,
...@@ -77,7 +77,7 @@ class taosdemoPerformace: ...@@ -77,7 +77,7 @@ class taosdemoPerformace:
insert_data = { insert_data = {
"filetype": "insert", "filetype": "insert",
"cfgdir": "/etc/taosperf", "cfgdir": "/etc/perf",
"host": "127.0.0.1", "host": "127.0.0.1",
"port": 6030, "port": 6030,
"user": "root", "user": "root",
......
...@@ -887,10 +887,16 @@ sql_error select tbname, t1 from select_tags_mt0 interval(1y); ...@@ -887,10 +887,16 @@ sql_error select tbname, t1 from select_tags_mt0 interval(1y);
#valid sql: select first(c1), last(c2), count(*) from select_tags_mt0 group by tbname, t1; #valid sql: select first(c1), last(c2), count(*) from select_tags_mt0 group by tbname, t1;
#valid sql: select first(c1), tbname, t1 from select_tags_mt0 group by t2; #valid sql: select first(c1), tbname, t1 from select_tags_mt0 group by t2;
print ==================================>TD-4231
sql_error select t1,tbname from select_tags_mt0 where c1<0
sql_error select t1,tbname from select_tags_mt0 where c1<0 and tbname in ('select_tags_tb12')
sql select tbname from select_tags_mt0 where tbname in ('select_tags_tb12');
sql_error select first(c1), last(c2), t1 from select_tags_mt0 group by tbname; sql_error select first(c1), last(c2), t1 from select_tags_mt0 group by tbname;
sql_error select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname; sql_error select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname;
sql_error select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname; sql_error select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname;
# this sql is valid: select first(c1), t2 from select_tags_mt0 group by tbname; #valid sql: select first(c1), t2 from select_tags_mt0 group by tbname;
#sql select first(ts), tbname from select_tags_mt0 group by tbname; #sql select first(ts), tbname from select_tags_mt0 group by tbname;
#sql select count(c1) from select_tags_mt0 where c1=99 group by tbname; #sql select count(c1) from select_tags_mt0 where c1=99 group by tbname;
......
...@@ -158,7 +158,7 @@ if $dnode4Vtatus != offline then ...@@ -158,7 +158,7 @@ if $dnode4Vtatus != offline then
sleep 2000 sleep 2000
goto wait_dnode4_vgroup_offline goto wait_dnode4_vgroup_offline
endi endi
if $dnode3Vtatus != master then if $dnode3Vtatus != unsynced then
sleep 2000 sleep 2000
goto wait_dnode4_vgroup_offline goto wait_dnode4_vgroup_offline
endi endi
......
...@@ -41,7 +41,7 @@ sql create dnode $hostname2 ...@@ -41,7 +41,7 @@ sql create dnode $hostname2
sleep 10000 sleep 10000
sql show log.tables; sql show log.tables;
if $rows != 5 then if $rows > 6 then
return -1 return -1
endi endi
......
...@@ -56,7 +56,7 @@ print $data30 ...@@ -56,7 +56,7 @@ print $data30
print $data40 print $data40
print $data50 print $data50
if $rows != 5 then if $rows > 6 then
return -1 return -1
endi endi
......
...@@ -19,7 +19,7 @@ sleep 3000 ...@@ -19,7 +19,7 @@ sleep 3000
sql show dnodes sql show dnodes
print dnode1 openVnodes $data2_1 print dnode1 openVnodes $data2_1
if $data2_1 != 1 then if $data2_1 > 2 then
return -1 return -1
endi endi
...@@ -41,7 +41,7 @@ print dnode2 openVnodes $data2_2 ...@@ -41,7 +41,7 @@ print dnode2 openVnodes $data2_2
if $data2_1 != 0 then if $data2_1 != 0 then
goto show2 goto show2
endi endi
if $data2_2 != 1 then if $data2_2 > 2 then
goto show2 goto show2
endi endi
...@@ -55,7 +55,7 @@ print $data30 ...@@ -55,7 +55,7 @@ print $data30
print $data40 print $data40
print $data50 print $data50
if $rows != 4 then if $rows > 5 then
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册