提交 338ef1b8 编写于 作者: Y yihaoDeng

Merge branch 'develop' of github.com:taosdata/TDengine into order

......@@ -23,7 +23,7 @@ TDengine是涛思数据专为物联网、车联网、工业互联网、IT运维
TDengine是一个高效的存储、查询、分析时序大数据的平台,专为物联网、车联网、工业互联网、运维监测等优化而设计。您可以像使用关系型数据库MySQL一样来使用它,但建议您在使用前仔细阅读一遍下面的文档,特别是 [数据模型](https://www.taosdata.com/cn/documentation/architecture)[数据建模](https://www.taosdata.com/cn/documentation/model)。除本文档之外,欢迎 [下载产品白皮书](https://www.taosdata.com/downloads/TDengine%20White%20Paper.pdf)
# 生成
# 构建
TDengine目前2.0版服务器仅能在Linux系统上安装和运行,后续会支持Windows、macOS等系统。客户端可以在Windows或Linux上安装和运行。任何OS的应用也可以选择RESTful接口连接服务器taosd。CPU支持X64/ARM64/MIPS64/Alpha64,后续会支持ARM32、RISC-V等CPU架构。用户可根据需求选择通过[源码](https://www.taosdata.com/cn/getting-started/#通过源码安装)或者[安装包](https://www.taosdata.com/cn/getting-started/#通过安装包安装)来安装。本快速指南仅适用于通过源码安装。
......@@ -107,7 +107,7 @@ Go 连接器和 Grafana 插件在其他独立仓库,如果安装它们的话
git submodule update --init --recursive
```
## 生成 TDengine
## 构建 TDengine
### Linux 系统
......@@ -116,6 +116,12 @@ mkdir debug && cd debug
cmake .. && cmake --build .
```
您可以选择使用 Jemalloc 作为内存分配器,替代默认的 glibc:
```bash
apt install autoconf
cmake .. -DJEMALLOC_ENABLED=true
```
在X86-64、X86、arm64、arm32 和 mips64 平台上,TDengine 生成脚本可以自动检测机器架构。也可以手动配置 CPUTYPE 参数来指定 CPU 类型,如 aarch64 或 aarch32 等。
aarch64:
......
......@@ -110,6 +110,12 @@ mkdir debug && cd debug
cmake .. && cmake --build .
```
You can use Jemalloc as memory allocator instead of glibc:
```
apt install autoconf
cmake .. -DJEMALLOC_ENABLED=true
```
TDengine build script can detect the host machine's architecture on X86-64, X86, arm64, arm32 and mips64 platform.
You can also specify CPUTYPE option like aarch64 or aarch32 too if the detection result is not correct:
......
......@@ -39,7 +39,7 @@ SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} ${COMMON_C_FLAGS} ${DEBUG_FLAGS}
SET(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} ${COMMON_C_FLAGS} ${RELEASE_FLAGS}")
# Set c++ compiler options
SET(COMMON_CXX_FLAGS "${COMMON_FLAGS} -std=c++11")
SET(COMMON_CXX_FLAGS "${COMMON_FLAGS} -std=c++11 -Wno-unused-function")
SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} ${COMMON_CXX_FLAGS} ${DEBUG_FLAGS}")
SET(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ${COMMON_CXX_FLAGS} ${RELEASE_FLAGS}")
......
......@@ -42,7 +42,7 @@ TDengine是一个高效的存储、查询、分析时序大数据的平台,专
* [数据写入](/taos-sql#insert):支持单表单条、多条、多表多条写入,支持历史数据写入
* [数据查询](/taos-sql#select):支持时间段、值过滤、排序、查询结果手动分页等
* [SQL函数](/taos-sql#functions):支持各种聚合函数、选择函数、计算函数,如avg, min, diff等
* [时间维度聚合](/taos-sql#aggregation):将表中数据按照时间段进行切割后聚合,降维处理
* [窗口切分聚合](/taos-sql#aggregation):将表中数据按照时间段等方式进行切割后聚合,降维处理
* [边界限制](/taos-sql#limitation):库、表、SQL等边界限制条件
* [错误码](/taos-sql/error-code):TDengine 2.0 错误码以及对应的十进制码
......@@ -63,7 +63,7 @@ TDengine是一个高效的存储、查询、分析时序大数据的平台,专
## [高级功能](/advanced-features)
* [连续查询(Continuous Query)](/advanced-features#continuous-query):基于滑动窗口,定时自动的对数据流进行查询计算
* [数据订阅(Publisher/Subscriber)](/advanced-features#subscribe)典型的消息队列,应用可订阅接收到的最新数据
* [数据订阅(Publisher/Subscriber)](/advanced-features#subscribe)类似典型的消息队列,应用可订阅接收到的最新数据
* [缓存(Cache)](/advanced-features#cache):每个设备最新的数据都会缓存在内存中,可快速获取
* [报警监测](/advanced-features#alert):根据配置规则,自动监测超限行为数据,并主动推送
......
......@@ -532,8 +532,9 @@ Query OK, 1 row(s) in set (0.000141s)
| taos-jdbcdriver 版本 | TDengine 版本 | JDK 版本 |
| -------------------- | ----------------- | -------- |
| 2.0.22 | 2.0.18.0 及以上 | 1.8.x |
| 2.0.12 - 2.0.21 | 2.0.8.0 - 2.0.17.0 | 1.8.x |
| 2.0.31 | 2.1.3.0 及以上 | 1.8.x |
| 2.0.22 - 20.0.30 | 2.0.18.0 - 2.1.2.x | 1.8.x |
| 2.0.12 - 2.0.21 | 2.0.8.0 - 2.0.17.x | 1.8.x |
| 2.0.4 - 2.0.11 | 2.0.0.0 - 2.0.7.x | 1.8.x |
| 1.0.3 | 1.6.1.x 及以上 | 1.8.x |
| 1.0.2 | 1.6.1.x 及以上 | 1.8.x |
......
......@@ -427,12 +427,15 @@ TDengine提供时间驱动的实时流式计算API。可以每隔一指定的时
* res:查询结果集,注意结果集中可能没有记录
* param:调用 `taos_subscribe`时客户程序提供的附加参数
* code:错误码
**注意**:在这个回调函数里不可以做耗时过长的处理,尤其是对于返回的结果集中数据较多的情况,否则有可能导致客户端阻塞等异常状态。如果必须进行复杂计算,则建议在另外的线程中进行处理。
* `TAOS_RES *taos_consume(TAOS_SUB *tsub)`
同步模式下,该函数用来获取订阅的结果。 用户应用程序将其置于一个循环之中。 如两次调用`taos_consume`的间隔小于订阅的轮询周期,API将会阻塞,直到时间间隔超过此周期。 如果数据库有新记录到达,该API将返回该最新的记录,否则返回一个没有记录的空结果集。 如果返回值为 `NULL`,说明系统出错。 异步模式下,用户程序不应调用此API。
**注意**:在调用 `taos_consume()` 之后,用户应用应确保尽快调用 `taos_fetch_row()``taos_fetch_block()` 来处理订阅结果,否则服务端会持续缓存查询结果数据等待客户端读取,极端情况下会导致服务端内存消耗殆尽,影响服务稳定性。
* `void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress)`
取消订阅。 如参数 `keepProgress` 不为0,API会保留订阅的进度信息,后续调用 `taos_subscribe` 时可以基于此进度继续;否则将删除进度信息,后续只能重新开始读取数据。
......
......@@ -476,9 +476,10 @@ Query OK, 1 row(s) in set (0.001091s)
SELECT select_expr [, select_expr ...]
FROM {tb_name_list}
[WHERE where_condition]
[INTERVAL (interval_val [, interval_offset])]
[SLIDING sliding_val]
[FILL fill_val]
[SESSION(ts_col, tol_val)]
[STATE_WINDOW(col)]
[INTERVAL(interval_val [, interval_offset]) [SLIDING sliding_val]]
[FILL(fill_mod_and_val)]
[GROUP BY col_list]
[ORDER BY col_list { DESC | ASC }]
[SLIMIT limit_val [SOFFSET offset_val]]
......@@ -853,7 +854,23 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
适用于:**表**。
适用于:**表、(超级表)**。
说明:从 2.1.3.0 版本开始,TWA 函数可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。
- **IRATE**
```mysql
SELECT IRATE(field_name) FROM tb_name WHERE clause;
```
功能说明:计算瞬时增长率。使用时间区间中最后两个样本数据来计算瞬时增长速率;如果这两个值呈递减关系,那么只取最后一个数用于计算,而不是使用二者差值。
返回结果数据类型:双精度浮点数Double。
应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
适用于:**表、(超级表)**。
说明:(从 2.1.3.0 版本开始新增此函数)IRATE 可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。
- **SUM**
```mysql
......@@ -1202,13 +1219,14 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
```
### 计算函数
- **DIFF**
```mysql
SELECT DIFF(field_name) FROM tb_name [WHERE clause];
```
功能说明:统计表中某列的值与前一行对应值的差。
返回结果数据类型: 同应用字段。
返回结果数据类型:同应用字段。
应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
......@@ -1226,13 +1244,27 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
Query OK, 2 row(s) in set (0.001162s)
```
- **DERIVATIVE**
```mysql
SELECT DERIVATIVE(field_name, time_interval, ignore_negative) FROM tb_name [WHERE clause];
```
功能说明:统计表中某列数值的单位变化率。其中单位时间区间的长度可以通过 time_interval 参数指定,最小可以是 1 秒(1s);ignore_negative 参数的值可以是 0 或 1,为 1 时表示忽略负值。
返回结果数据类型:双精度浮点数。
应用字段:不能应用在 timestamp、binary、nchar、bool 类型字段。
适用于:**表、(超级表)**。
说明:(从 2.1.3.0 版本开始新增此函数)输出结果行数是范围内总行数减一,第一行没有结果输出。DERIVATIVE 函数可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。
- **SPREAD**
```mysql
SELECT SPREAD(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
功能说明:统计表/超级表中某列的最大值和最小值之差。
返回结果数据类型: 双精度浮点数。
返回结果数据类型:双精度浮点数。
应用字段:不能应用在binary、nchar、bool类型字段。
......@@ -1284,39 +1316,45 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
Query OK, 3 row(s) in set (0.001046s)
```
## <a class="anchor" id="aggregation"></a>时间维度聚合
## <a class="anchor" id="aggregation"></a>按窗口切分聚合
TDengine支持按时间段进行聚合,可以将表中数据按照时间段进行切割后聚合生成结果,比如温度传感器每秒采集一次数据,但需查询每隔10分钟的温度平均值。这个聚合适合于降维(down sample)操作, 语法如下:
TDengine 支持按时间段等窗口切分方式进行聚合结果查询,比如温度传感器每秒采集一次数据,但需查询每隔 10 分钟的温度平均值。这类聚合适合于降维(down sample)操作,语法如下:
```mysql
SELECT function_list FROM tb_name
[WHERE where_condition]
INTERVAL (interval [, offset])
[SLIDING sliding]
[FILL ({NONE | VALUE | PREV | NULL | LINEAR | NEXT})]
[SESSION(ts_col, tol_val)]
[STATE_WINDOW(col)]
[INTERVAL(interval [, offset]) [SLIDING sliding]]
[FILL({NONE | VALUE | PREV | NULL | LINEAR | NEXT})]
SELECT function_list FROM stb_name
[WHERE where_condition]
INTERVAL (interval [, offset])
[SLIDING sliding]
[FILL ({ VALUE | PREV | NULL | LINEAR | NEXT})]
[SESSION(ts_col, tol_val)]
[STATE_WINDOW(col)]
[INTERVAL(interval [, offset]) [SLIDING sliding]]
[FILL({NONE | VALUE | PREV | NULL | LINEAR | NEXT})]
[GROUP BY tags]
```
- 聚合时间段的长度由关键词INTERVAL指定,最短时间间隔10毫秒(10a),并且支持偏移(偏移必须小于间隔)。聚合查询中,能够同时执行的聚合和选择函数仅限于单个输出的函数:count、avg、sum 、stddev、leastsquares、percentile、min、max、first、last,不能使用具有多行输出结果的函数(例如:top、bottom、diff以及四则运算)。
- WHERE语句可以指定查询的起止时间和其他过滤条件。
- SLIDING语句用于指定聚合时间段的前向增量。
- FILL语句指定某一时间区间数据缺失的情况下的填充模式。填充模式包括以下几种:
1. 不进行填充:NONE(默认填充模式)。
2. VALUE填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。
3. NULL填充:使用NULL填充数据。例如:FILL(NULL)。
4. PREV填充:使用前一个非NULL值填充数据。例如:FILL(PREV)。
5. NEXT填充:使用下一个非NULL值填充数据。例如:FILL(NEXT)。
- 在聚合查询中,function_list 位置允许使用聚合和选择函数,并要求每个函数仅输出单个结果(例如:COUNT、AVG、SUM、STDDEV、LEASTSQUARES、PERCENTILE、MIN、MAX、FIRST、LAST),而不能使用具有多行输出结果的函数(例如:TOP、BOTTOM、DIFF 以及四则运算)。
- 查询过滤、聚合等操作按照每个切分窗口为独立的单位执行。聚合查询目前支持三种窗口的划分方式:
1. 时间窗口:聚合时间段的窗口宽度由关键词 INTERVAL 指定,最短时间间隔 10 毫秒(10a);并且支持偏移 offset(偏移必须小于间隔),也即时间窗口划分与“UTC 时刻 0”相比的偏移量。SLIDING 语句用于指定聚合时间段的前向增量,也即每次窗口向前滑动的时长。当 SLIDING 与 INTERVAL 取值相等的时候,滑动窗口即为翻转窗口。
2. 状态窗口:使用整数(布尔值)或字符串来标识产生记录时设备的状态量,产生的记录如果具有相同的状态量取值则归属于同一个状态窗口,数值改变后该窗口关闭。状态量所对应的列作为 STAT_WINDOW 语句的参数来指定。
3. 会话窗口:时间戳所在的列由 SESSION 语句的 ts_col 参数指定,会话窗口根据相邻两条记录的时间戳差值来确定是否属于同一个会话——如果时间戳差异在 tol_val 以内,则认为记录仍属于同一个窗口;如果时间变化超过 tol_val,则自动开启下一个窗口。
- WHERE 语句可以指定查询的起止时间和其他过滤条件。
- FILL 语句指定某一窗口区间数据缺失的情况下的填充模式。填充模式包括以下几种:
1. 不进行填充:NONE(默认填充模式)。
2. VALUE 填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。
3. PREV 填充:使用前一个非 NULL 值填充数据。例如:FILL(PREV)。
4. NULL 填充:使用 NULL 填充数据。例如:FILL(NULL)。
5. LINEAR 填充:根据前后距离最近的非 NULL 值做线性插值填充。例如:FILL(LINEAR)。
6. NEXT 填充:使用下一个非 NULL 值填充数据。例如:FILL(NEXT)。
说明:
1. 使用FILL语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过1千万条具有插值的结果。
1. 使用 FILL 语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过 1 千万条具有插值的结果。
2. 在时间维度聚合中,返回的结果中时间序列严格单调递增。
3. 如果查询对象是超级表,则聚合函数会作用于该超级表下满足值过滤条件的所有表的数据。如果查询中没有使用GROUP BY语句,则返回的结果按照时间序列严格单调递增;如果查询中使用了GROUP BY语句分组,则返回结果中每个GROUP内不按照时间序列严格单调递增。
3. 如果查询对象是超级表,则聚合函数会作用于该超级表下满足值过滤条件的所有表的数据。如果查询中没有使用 GROUP BY 语句,则返回的结果按照时间序列严格单调递增;如果查询中使用了 GROUP BY 语句分组,则返回结果中每个 GROUP 内不按照时间序列严格单调递增。
时间聚合也常被用于连续查询场景,可以参考文档 [连续查询(Continuous Query)](https://www.taosdata.com/cn/documentation/advanced-features#continuous-query)。
......@@ -1326,7 +1364,7 @@ SELECT function_list FROM stb_name
CREATE TABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT);
```
针对智能电表采集的数据,以10分钟为一个阶段,计算过去24小时的电流数据的平均值、最大值、电流的中位数、以及随着时间变化的电流走势拟合直线。如果没有计算值,用前一个非NULL值填充。使用的查询语句如下:
针对智能电表采集的数据,以 10 分钟为一个阶段,计算过去 24 小时的电流数据的平均值、最大值、电流的中位数、以及随着时间变化的电流走势拟合直线。如果没有计算值,用前一个非 NULL 值填充。使用的查询语句如下:
```mysql
SELECT AVG(current), MAX(current), LEASTSQUARES(current, start_val, step_val), PERCENTILE(current, 50) FROM meters
......
......@@ -210,7 +210,8 @@ int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaI
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
SColumn* tscColumnClone(const SColumn* src);
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
void tscColumnCopy(SColumn* pDest, const SColumn* pSrc);
int32_t tscColumnExists(SArray* pColumnList, int32_t columnId, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
......
......@@ -898,7 +898,9 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while(1) {
bool prev = *newgroup;
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
*newgroup = prev;
break;
......@@ -966,7 +968,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -974,7 +978,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -986,7 +992,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock;
}
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -1000,7 +1009,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
}
while ((*newgroup) == false) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......
......@@ -485,6 +485,7 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) {
char buf[TSDB_DB_NAME_LEN + 64] = {0};
do {
memset(buf, 0, sizeof(buf));
int32_t* lengths = taos_fetch_lengths(pSql);
int32_t ret = tscGetNthFieldResult(row, fields, lengths, 0, buf);
if (0 == ret && STR_NOCASE_EQUAL(buf, strlen(buf), builder->buf, strlen(builder->buf))) {
......
......@@ -1580,7 +1580,6 @@ void tscImportDataFromFile(SSqlObj *pSql) {
SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
pCmd->count = 1;
FILE *fp = fopen(pCmd->payload, "rb");
if (fp == NULL) {
......
......@@ -1156,27 +1156,6 @@ static void insertBatchClean(STscStmt* pStmt) {
tfree(pCmd->insertParam.pTableNameList);
/*
STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
STableDataBlocks* pOneTableBlock = *p;
while (1) {
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
pOneTableBlock->size = sizeof(SSubmitBlk);
pBlocks->numOfRows = 0;
p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p);
if (p == NULL) {
break;
}
pOneTableBlock = *p;
}
*/
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
pCmd->insertParam.numOfTables = 0;
......@@ -1499,7 +1478,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sql);
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
if (tscIsInsertData(pSql->sqlstr)) {
pStmt->isInsert = true;
......@@ -1604,7 +1583,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) {
SHashObj* hashList = pCmd->insertParam.pTableBlockHashList;
pCmd->insertParam.pTableBlockHashList = NULL;
tscResetSqlCmd(pCmd, true);
tscResetSqlCmd(pCmd, false);
pCmd->insertParam.pTableBlockHashList = hashList;
}
......@@ -1663,7 +1642,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
} else {
if (pStmt->multiTbInsert) {
taosHashCleanup(pStmt->mtb.pTableHash);
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, true);
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, false);
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
taosArrayDestroy(pStmt->mtb.tags);
......
此差异已折叠。
......@@ -795,6 +795,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
pSqlExpr->colBytes = htons(pExpr->colBytes);
pSqlExpr->resType = htons(pExpr->resType);
pSqlExpr->resBytes = htons(pExpr->resBytes);
pSqlExpr->interBytes = htonl(pExpr->interBytes);
pSqlExpr->functionId = htons(pExpr->functionId);
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
pSqlExpr->resColId = htons(pExpr->resColId);
......@@ -1495,7 +1496,9 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = (char *)pSchema;
pAlterTableMsg->tagValLen = htonl(pAlterInfo->tagData.dataLen);
memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
if (pAlterInfo->tagData.dataLen > 0) {
memcpy(pMsg, pAlterInfo->tagData.data, pAlterInfo->tagData.dataLen);
}
pMsg += pAlterInfo->tagData.dataLen;
msgLen = (int32_t)(pMsg - (char*)pAlterTableMsg);
......
......@@ -512,6 +512,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSub->pSql = pSql;
pSql->pSubscription = pSub;
pSub->lastSyncTime = 0;
// no table list now, force to update it
tscDebug("begin table synchronization");
......
......@@ -103,13 +103,6 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
pthread_mutex_lock(&subState->mutex);
// bool done = allSubqueryDone(pParentSql);
// if (done) {
// tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx);
// pthread_mutex_unlock(&subState->mutex);
// return false;
// }
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx);
subState->states[idx] = 1;
......@@ -2389,8 +2382,14 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SColumn *pCol = taosArrayGetP(pColList, i);
if (pCol->info.flist.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn *p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
int32_t index1 = tscColumnExists(pNewQueryInfo->colList, pCol->columnIndex, pCol->tableUid);
if (index1 >= 0) {
SColumn* x = taosArrayGetP(pNewQueryInfo->colList, index1);
tscColumnCopy(x, pCol);
} else {
SColumn *p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
}
}
}
......@@ -3605,10 +3604,10 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
// todo refactor: filter should not be applied here.
createFilterInfo(pQueryAttr, 0);
pQueryAttr->numOfFilterCols = 0;
SArray* pa = NULL;
if (stage == MASTER_SCAN) {
pQueryAttr->createFilterOperator = false; // no need for parent query
pa = createExecOperatorPlan(pQueryAttr);
} else {
pa = createGlobalMergePlan(pQueryAttr);
......
......@@ -825,7 +825,10 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup)
SJoinStatus* pStatus = &pJoinInfo->status[i];
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
pStatus->index = 0;
if (pStatus->pBlock == NULL) {
......@@ -1304,7 +1307,7 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
if (pCmd->pTableMetaMap != NULL) {
STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL);
while (p) {
tfree(p->pVgroupInfo);
tscVgroupInfoClear(p->pVgroupInfo);
tfree(p->pTableMeta);
p = taosHashIterate(pCmd->pTableMetaMap, p);
}
......@@ -1332,7 +1335,7 @@ void tscFreeSubobj(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
tscDebug("0x%"PRIx64" free sub SqlObj:%p, index:%d", pSql->self, pSql->pSubs[i], i);
tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i);
taos_free_result(pSql->pSubs[i]);
pSql->pSubs[i] = NULL;
}
......@@ -1784,7 +1787,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
tscSortRemoveDataBlockDupRows(pOneTableBlock);
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
tscDebug("0x%"PRIx64" name:%s, name:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName),
tscDebug("0x%"PRIx64" name:%s, tid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName),
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
......@@ -2270,18 +2273,14 @@ int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy) {
return 0;
}
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) {
// ignore the tbname columnIndex to be inserted into source list
if (columnIndex < 0) {
return false;
}
// ignore the tbname columnIndex to be inserted into source list
int32_t tscColumnExists(SArray* pColumnList, int32_t columnId, uint64_t uid) {
size_t numOfCols = taosArrayGetSize(pColumnList);
int32_t i = 0;
while (i < numOfCols) {
SColumn* pCol = taosArrayGetP(pColumnList, i);
if ((pCol->columnIndex != columnIndex) || (pCol->tableUid != uid)) {
if ((pCol->info.colId != columnId) || (pCol->tableUid != uid)) {
++i;
continue;
} else {
......@@ -2290,10 +2289,10 @@ bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) {
}
if (i >= numOfCols || numOfCols == 0) {
return false;
return -1;
}
return true;
return i;
}
void tscExprAssign(SExprInfo* dst, const SExprInfo* src) {
......@@ -2379,13 +2378,7 @@ SColumn* tscColumnClone(const SColumn* src) {
return NULL;
}
dst->columnIndex = src->columnIndex;
dst->tableUid = src->tableUid;
dst->info.flist.numOfFilters = src->info.flist.numOfFilters;
dst->info.flist.filterInfo = tFilterInfoDup(src->info.flist.filterInfo, src->info.flist.numOfFilters);
dst->info.type = src->info.type;
dst->info.colId = src->info.colId;
dst->info.bytes = src->info.bytes;
tscColumnCopy(dst, src);
return dst;
}
......@@ -2394,6 +2387,18 @@ static void tscColumnDestroy(SColumn* pCol) {
free(pCol);
}
void tscColumnCopy(SColumn* pDest, const SColumn* pSrc) {
destroyFilterInfo(&pDest->info.flist);
pDest->columnIndex = pSrc->columnIndex;
pDest->tableUid = pSrc->tableUid;
pDest->info.flist.numOfFilters = pSrc->info.flist.numOfFilters;
pDest->info.flist.filterInfo = tFilterInfoDup(pSrc->info.flist.filterInfo, pSrc->info.flist.numOfFilters);
pDest->info.type = pSrc->info.type;
pDest->info.colId = pSrc->info.colId;
pDest->info.bytes = pSrc->info.bytes;
}
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid) {
assert(src != NULL && dst != NULL);
......@@ -3276,6 +3281,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pnCmd->insertParam.pTableNameList = NULL;
pnCmd->insertParam.pTableBlockHashList = NULL;
memset(&pnCmd->insertParam.tagData, 0, sizeof(STagData));
if (tscAddQueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
......@@ -4075,7 +4082,10 @@ SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) {
size_t size = sizeof(SVgroupInfo) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo);
SVgroupsInfo* pInfo = calloc(1, size);
memcpy(pInfo, pVgroupsInfo, size);
pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups;
for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) {
tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]);
}
return pInfo;
}
......
......@@ -98,7 +98,7 @@ TEST(testCase, parse_time) {
taosParseTime(t41, &time, strlen(t41), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 852048000999);
// int64_t k = timezone;
// int64_t k = timezone;
char t42[] = "1997-1-1T0:0:0.999999999Z";
taosParseTime(t42, &time, strlen(t42), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 852048000999 - timezone * MILLISECOND_PER_SECOND);
......
......@@ -289,6 +289,11 @@ static FORCE_INLINE TKEY dataColsTKeyFirst(SDataCols *pCols) {
}
}
static FORCE_INLINE TSKEY dataColsKeyAtRow(SDataCols *pCols, int row) {
ASSERT(row < pCols->numOfRows);
return dataColsKeyAt(pCols, row);
}
static FORCE_INLINE TSKEY dataColsKeyFirst(SDataCols *pCols) {
if (pCols->numOfRows) {
return dataColsKeyAt(pCols, 0);
......
......@@ -452,7 +452,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
SDataCols *pTarget = NULL;
if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyFirst(source))) { // No overlap
if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) { // No overlap
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) {
......
......@@ -303,6 +303,8 @@ static int32_t dnodeInitStorage() {
dnodeCheckDataDirOpenned(tsDnodeDir);
taosGetDisk();
taosPrintDiskInfo();
dInfo("dnode storage is initialized at %s", tsDnodeDir);
return 0;
}
......
......@@ -100,7 +100,7 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_TIME_PRECISION_MICRO_STR "us"
#define TSDB_TIME_PRECISION_NANO_STR "ns"
#define TSDB_TICK_PER_SECOND(precision) ((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L))
#define TSDB_TICK_PER_SECOND(precision) ((int64_t)((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
#define T_APPEND_MEMBER(dst, ptr, type, member) \
......
此差异已折叠。
......@@ -29,6 +29,9 @@
#define COMMAND_SIZE 65536
//#define DEFAULT_DUMP_FILE "taosdump.sql"
// for strncpy buffer overflow
#define min(a, b) (((a) < (b)) ? (a) : (b))
int converStringToReadable(char *str, int size, char *buf, int bufsize);
int convertNCharToReadable(char *str, int size, char *buf, int bufsize);
void taosDumpCharset(FILE *fp);
......@@ -1119,12 +1122,11 @@ int taosGetTableDes(
TAOS_FIELD *fields = taos_fetch_fields(res);
tstrncpy(tableDes->name, table, TSDB_TABLE_NAME_LEN);
while ((row = taos_fetch_row(res)) != NULL) {
strncpy(tableDes->cols[count].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
strncpy(tableDes->cols[count].type, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
tableDes->cols[count].length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
strncpy(tableDes->cols[count].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
......@@ -1575,7 +1577,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
tstrncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
tstrncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
min(TSDB_TABLE_NAME_LEN, fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes));
taosWrite(fd, &tableRecord, sizeof(STableRecord));
......
......@@ -101,6 +101,8 @@ static int32_t mnodeDnodeActionInsert(SSdbRow *pRow) {
pDnode->offlineReason = TAOS_DN_OFF_STATUS_NOT_RECEIVED;
}
pDnode->customScore = 0;
dnodeUpdateEp(pDnode->dnodeId, pDnode->dnodeEp, pDnode->dnodeFqdn, &pDnode->dnodePort);
mnodeUpdateDnodeEps();
......@@ -1296,4 +1298,4 @@ int32_t mnodeCompactDnodes() {
mInfo("end to compact dnodes table...");
return 0;
}
\ No newline at end of file
}
......@@ -1068,7 +1068,9 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
pStable->info.tableId = strdup(pCreate->tableName);
pStable->info.type = TSDB_SUPER_TABLE;
pStable->createdTime = taosGetTimestampMs();
pStable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
int64_t x = (us&0x000000FFFFFFFFFF);
x = x<<24;
pStable->uid = x + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
pStable->sversion = 0;
pStable->tversion = 0;
pStable->numOfColumns = numOfColumns;
......@@ -1740,16 +1742,22 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
static int32_t calculateVgroupMsgLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) {
static int32_t doGetVgroupInfoLength(char* name) {
SSTableObj *pTable = mnodeGetSuperTable(name);
int32_t len = 0;
if (pTable != NULL && pTable->vgHash != NULL) {
len = (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg));
}
mnodeDecTableRef(pTable);
return len;
}
static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) {
int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg);
for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
SSTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable != NULL && pTable->vgHash != NULL) {
contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg));
}
mnodeDecTableRef(pTable);
contLen += doGetVgroupInfoLength(stableName);
}
return contLen;
......@@ -1820,7 +1828,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
int32_t numOfTable = htonl(pInfo->numOfTables);
// calculate the required space.
int32_t contLen = calculateVgroupMsgLength(pInfo, numOfTable);
int32_t contLen = getVgroupInfoLength(pInfo, numOfTable);
SSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
......@@ -2860,6 +2868,27 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
}
}
static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray* pList, int32_t* totalMallocLen, int32_t numOfVgroupList) {
int32_t len = 0;
for (int32_t i = 0; i < numOfVgroupList; ++i) {
char *name = taosArrayGetP(pList, i);
len += doGetVgroupInfoLength(name);
}
if (len + pMultiMeta->contLen > (*totalMallocLen)) {
while (len + pMultiMeta->contLen > (*totalMallocLen)) {
(*totalMallocLen) *= 2;
}
pMultiMeta = rpcReallocCont(pMultiMeta, *totalMallocLen);
if (pMultiMeta == NULL) {
return NULL;
}
}
return pMultiMeta;
}
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
......@@ -2950,8 +2979,6 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
}
}
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
// add the additional super table names that needs the vgroup info
for(;t < num; ++t) {
taosArrayPush(pList, &nameList[t]);
......@@ -2961,6 +2988,13 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pList);
pMultiMeta->numOfVgroup = htonl(numOfVgroupList);
pMultiMeta = ensureMsgBufferSpace(pMultiMeta, pList, &totalMallocLen, numOfVgroupList);
if (pMultiMeta == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _end;
}
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
for(int32_t i = 0; i < numOfVgroupList; ++i) {
char* name = taosArrayGetP(pList, i);
......
......@@ -29,7 +29,7 @@ extern "C" {
#include "osMath.h"
#include "osMemory.h"
#include "osRand.h"
#include "osSemphone.h"
#include "osSemaphore.h"
#include "osSignal.h"
#include "osSleep.h"
#include "osSocket.h"
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_SEMPHONE_H
#define TDENGINE_OS_SEMPHONE_H
#ifndef TDENGINE_OS_SEMAPHORE_H
#define TDENGINE_OS_SEMAPHORE_H
#ifdef __cplusplus
extern "C" {
......
......@@ -27,18 +27,20 @@ typedef struct {
} SysDiskSize;
int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize);
void taosGetSystemInfo();
bool taosGetProcIO(float *readKB, float *writeKB);
bool taosGetBandSpeed(float *bandSpeedKb);
void taosGetDisk();
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage);
bool taosGetProcMemory(float *memoryUsedMB);
bool taosGetSysMemory(float *memoryUsedMB);
void taosPrintOsInfo();
int taosSystem(const char *cmd);
void taosKillSystem();
bool taosGetSystemUid(char *uid);
char * taosGetCmdlineByPID(int pid);
void taosGetSystemInfo();
bool taosGetProcIO(float *readKB, float *writeKB);
bool taosGetBandSpeed(float *bandSpeedKb);
void taosGetDisk();
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) ;
bool taosGetProcMemory(float *memoryUsedMB) ;
bool taosGetSysMemory(float *memoryUsedMB);
void taosPrintOsInfo();
void taosPrintDiskInfo();
int taosSystem(const char * cmd) ;
void taosKillSystem();
bool taosGetSystemUid(char *uid);
char *taosGetCmdlineByPID(int pid);
void taosSetCoreDump();
......
......@@ -136,9 +136,6 @@ void taosPrintOsInfo() {
// uInfo(" os openMax: %" PRId64, tsOpenMax);
// uInfo(" os streamMax: %" PRId64, tsStreamMax);
uInfo(" os numOfCores: %d", tsNumOfCores);
uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB);
uInfo(" os usedDisk: %f(GB)", tsUsedDataDirGB);
uInfo(" os availDisk: %f(GB)", tsAvailDataDirGB);
uInfo(" os totalMemory: %d(MB)", tsTotalMemoryMB);
struct utsname buf;
......@@ -154,6 +151,14 @@ void taosPrintOsInfo() {
uInfo("==================================");
}
void taosPrintDiskInfo() {
uInfo("==================================");
uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB);
uInfo(" os usedDisk: %f(GB)", tsUsedDataDirGB);
uInfo(" os availDisk: %f(GB)", tsAvailDataDirGB);
uInfo("==================================");
}
void taosKillSystem() {
uError("function taosKillSystem, exit!");
exit(0);
......
......@@ -506,9 +506,6 @@ void taosPrintOsInfo() {
uInfo(" os openMax: %" PRId64, tsOpenMax);
uInfo(" os streamMax: %" PRId64, tsStreamMax);
uInfo(" os numOfCores: %d", tsNumOfCores);
uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB);
uInfo(" os usedDisk: %f(GB)", tsUsedDataDirGB);
uInfo(" os availDisk: %f(GB)", tsAvailDataDirGB);
uInfo(" os totalMemory: %d(MB)", tsTotalMemoryMB);
struct utsname buf;
......@@ -523,6 +520,14 @@ void taosPrintOsInfo() {
uInfo(" os machine: %s", buf.machine);
}
void taosPrintDiskInfo() {
uInfo("==================================");
uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB);
uInfo(" os usedDisk: %f(GB)", tsUsedDataDirGB);
uInfo(" os availDisk: %f(GB)", tsAvailDataDirGB);
uInfo("==================================");
}
void taosKillSystem() {
// SIGINT
uInfo("taosd will shut down soon");
......
......@@ -205,10 +205,15 @@ void taosGetSystemInfo() {
void taosPrintOsInfo() {
uInfo(" os numOfCores: %d", tsNumOfCores);
uInfo(" os totalMemory: %d(MB)", tsTotalMemoryMB);
uInfo("==================================");
}
void taosPrintDiskInfo() {
uInfo("==================================");
uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB);
uInfo(" os usedDisk: %f(GB)", tsUsedDataDirGB);
uInfo(" os availDisk: %f(GB)", tsAvailDataDirGB);
uInfo(" os totalMemory: %d(MB)", tsTotalMemoryMB);
uInfo("==================================");
}
......
......@@ -228,13 +228,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
case TSDB_DATA_TYPE_NCHAR:
httpJsonStringForTransMean(jsonBuf, (char *)row[i], fields[i].bytes);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (precision == TSDB_TIME_PRECISION_MILLI) { // ms
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
} else {
httpJsonInt64(jsonBuf, *((int64_t *)row[i]) / 1000);
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t ts = convertTimePrecision(*((int64_t *)row[i]), precision, TSDB_TIME_PRECISION_MILLI);
httpJsonInt64(jsonBuf, ts);
break;
}
default:
httpJsonString(jsonBuf, "-", 1);
break;
......
......@@ -204,7 +204,7 @@ typedef struct SAggFunctionInfo {
bool (*init)(SQLFunctionCtx *pCtx); // setup the execute environment
void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function
void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function
// void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function
// finalizer must be called after all xFunction has been executed to generated final result.
void (*xFinalize)(SQLFunctionCtx *pCtx);
......
......@@ -133,6 +133,28 @@ typedef struct STableQueryInfo {
SResultRowInfo resInfo;
} STableQueryInfo;
typedef enum {
QUERY_PROF_BEFORE_OPERATOR_EXEC = 0,
QUERY_PROF_AFTER_OPERATOR_EXEC,
QUERY_PROF_QUERY_ABORT
} EQueryProfEventType;
typedef struct {
EQueryProfEventType eventType;
int64_t eventTime;
union {
uint8_t operatorType; //for operator event
int32_t abortCode; //for query abort event
};
} SQueryProfEvent;
typedef struct {
uint8_t operatorType;
int64_t sumSelfTime;
int64_t sumRunTimes;
} SOperatorProfResult;
typedef struct SQueryCostInfo {
uint64_t loadStatisTime;
uint64_t loadFileBlockTime;
......@@ -154,6 +176,9 @@ typedef struct SQueryCostInfo {
uint64_t tableInfoSize;
uint64_t hashSize;
uint64_t numOfTimeWindows;
SArray* queryProfEvents; //SArray<SQueryProfEvent>
SHashObj* operatorProfResults; //map<operator_type, SQueryProfEvent>
} SQueryCostInfo;
typedef struct {
......@@ -192,6 +217,7 @@ typedef struct SQueryAttr {
bool needReverseScan; // need reverse scan
bool distinctTag; // distinct tag query
bool stateWindow; // window State on sub/normal table
bool createFilterOperator; // if filter operator is needed
int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number
......@@ -285,7 +311,7 @@ enum OPERATOR_TYPE_E {
OP_TagScan = 4,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
OP_Arithmetic = 7,
OP_Project = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_SLimit = 10,
......@@ -295,7 +321,7 @@ enum OPERATOR_TYPE_E {
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
OP_DummyInput = 16, //TODO remove it after fully refactor.
OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter = 19,
OP_Distinct = 20,
......@@ -413,13 +439,13 @@ typedef struct SAggOperatorInfo {
uint32_t seed;
} SAggOperatorInfo;
typedef struct SArithOperatorInfo {
typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
int32_t bufCapacity;
uint32_t seed;
SSDataBlock *existDataBlock;
} SArithOperatorInfo;
} SProjectOperatorInfo;
typedef struct SLimitOperatorInfo {
int64_t limit;
......@@ -513,7 +539,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
......@@ -586,7 +612,12 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo);
void freeQueryAttr(SQueryAttr *pQuery);
......
......@@ -470,7 +470,7 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
//////////////////////// The SELECT statement /////////////////////////////////
%type select {SSqlNode*}
%destructor select {destroySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) having_opt(N) orderby_opt(Z) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N);
}
......
此差异已折叠。
此差异已折叠。
......@@ -565,7 +565,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
......@@ -585,7 +585,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
}
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->sw.gap > 0) {
......@@ -593,7 +593,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->stateWindow) {
......@@ -601,7 +601,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->simpleAgg) {
......@@ -619,15 +619,15 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
}
if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
} else { // diff/add/multiply/subtract/division
if (pQueryAttr->numOfFilterCols > 0 && pQueryAttr->vgId == 0) { // todo refactor
if (pQueryAttr->numOfFilterCols > 0 && pQueryAttr->createFilterOperator && pQueryAttr->vgId == 0) { // todo refactor
op = OP_Filter;
taosArrayPush(plan, &op);
} else {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
}
......@@ -665,7 +665,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
}
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
}
......
......@@ -2,6 +2,7 @@
#include "taoserror.h"
#include "tscompression.h"
#include "tutil.h"
#include "queryLog.h"
static int32_t getDataStartOffset();
static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo);
......@@ -633,10 +634,15 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
int32_t r = fseek(pTSBuf->f, 0, SEEK_SET);
if (r != 0) {
qError("fseek failed, errno:%d", errno);
return -1;
}
size_t ws = fwrite(pHeader, sizeof(STSBufFileHeader), 1, pTSBuf->f);
if (ws != 1) {
qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws, (int32_t)sizeof(STSBufFileHeader));
return -1;
}
fwrite(pHeader, sizeof(STSBufFileHeader), 1, pTSBuf->f);
return 0;
}
......@@ -853,9 +859,17 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo);
int32_t ret = fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
UNUSED(ret);
if (ret == -1) {
qError("fseek failed, errno:%d", errno);
tsBufDestroy(pTSBuf);
return NULL;
}
size_t sz = fwrite((void*)pData, 1, len, pTSBuf->f);
UNUSED(sz);
if (sz != len) {
qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len);
tsBufDestroy(pTSBuf);
return NULL;
}
pTSBuf->fileSize += len;
pTSBuf->tsOrder = order;
......@@ -863,9 +877,16 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header);
if (STSBufUpdateHeader(pTSBuf, &header) < 0) {
tsBufDestroy(pTSBuf);
return NULL;
}
taosFsync(fileno(pTSBuf->f));
if (taosFsync(fileno(pTSBuf->f)) == -1) {
qError("fsync failed, errno:%d", errno);
tsBufDestroy(pTSBuf);
return NULL;
}
return pTSBuf;
}
......
......@@ -232,6 +232,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
// error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pQInfo, ret);
pQInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo);
......@@ -240,7 +241,9 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId);
bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
......
......@@ -10,6 +10,7 @@
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
typedef struct ResultObj {
int32_t numOfResult;
......
......@@ -5,6 +5,10 @@
#include "taos.h"
#include "qHistogram.h"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
namespace {
void doHistogramAddTest() {
SHistogramInfo* pHisto = NULL;
......
......@@ -6,6 +6,9 @@
#include "qAggMain.h"
#include "tcompare.h"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
TEST(testCase, patternMatchTest) {
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
......
......@@ -7,6 +7,9 @@
#include "qPercentile.h"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
namespace {
tMemBucket *createBigIntDataBucket(int32_t start, int32_t end) {
tMemBucket *pBucket = tMemBucketCreate(sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, start, end);
......
......@@ -6,6 +6,9 @@
#include "taos.h"
#include "tsdb.h"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
namespace {
// simple test
void simpleTest() {
......
......@@ -9,6 +9,10 @@
#include "ttoken.h"
#include "tutil.h"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
namespace {
/**
*
......
......@@ -6,14 +6,17 @@
#include "taos.h"
#include "tsdb.h"
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "../../client/inc/tscUtil.h"
#include "tutil.h"
#include "tvariant.h"
#include "ttokendef.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
namespace {
int32_t testValidateName(char* name) {
SStrToken token = {0};
......
......@@ -480,11 +480,13 @@ static int tfsFormatDir(char *idir, char *odir) {
return -1;
}
if (realpath(wep.we_wordv[0], odir) == NULL) {
char tmp[PATH_MAX] = {0};
if (realpath(wep.we_wordv[0], tmp) == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
wordfree(&wep);
return -1;
}
strcpy(odir, tmp);
wordfree(&wep);
return 0;
......
......@@ -3364,7 +3364,7 @@ static bool tableFilterFp(const void* pNode, void* param) {
GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val);
return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
}
else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_DOUBLE) {
else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
double v;
GET_TYPED_DATA(v, double, pInfo->sch.type, val);
return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
......
......@@ -28,10 +28,41 @@ typedef struct SSchedMsg {
void *thandle;
} SSchedMsg;
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label);
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl);
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg);
void taosCleanUpScheduler(void *param);
/**
* Create a thread-safe ring-buffer based task queue and return the instance. A thread
* pool will be created to consume the messages in the queue.
* @param capacity the queue capacity
* @param numOfThreads the number of threads for the thread pool
* @param label the label of the queue
* @return the created queue scheduler
*/
void *taosInitScheduler(int capacity, int numOfThreads, const char *label);
/**
* Create a thread-safe ring-buffer based task queue and return the instance.
* Same as taosInitScheduler, and it also print the queue status every 1 minite.
* @param capacity the queue capacity
* @param numOfThreads the number of threads for the thread pool
* @param label the label of the queue
* @param tmrCtrl the timer controller, tmr_ctrl_t*
* @return the created queue scheduler
*/
void *taosInitSchedulerWithInfo(int capacity, int numOfThreads, const char *label, void *tmrCtrl);
/**
* Clean up the queue scheduler instance and free the memory.
* @param queueScheduler the queue scheduler to free
*/
void taosCleanUpScheduler(void *queueScheduler);
/**
* Schedule a new task to run, the task is described by pMsg.
* The function may be blocked if no thread is available to execute the task.
* That may happen when all threads are busy.
* @param queueScheduler the queue scheduler instance
* @param pMsg the message for the task
*/
void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -151,7 +151,7 @@ static bool taosReadDirectoryConfig(SGlobalCfg *cfg, char *input_value) {
wordfree(&full_path);
char tmp[1025] = {0};
char tmp[PATH_MAX] = {0};
if (realpath(option, tmp) != NULL) {
strcpy(option, tmp);
}
......
......@@ -108,39 +108,47 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label);
if (tmrCtrl != NULL && pSched != NULL) {
pSched->pTmrCtrl = tmrCtrl;
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}
return pSched;
}
void *taosProcessSchedQueue(void *param) {
void *taosProcessSchedQueue(void *scheduler) {
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)param;
SSchedQueue *pSched = (SSchedQueue *)scheduler;
int ret = 0;
while (1) {
if (tsem_wait(&pSched->fullSem) != 0) {
uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (pSched->stop) {
break;
}
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
msg = pSched->queue[pSched->fullSlot];
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (tsem_post(&pSched->emptySem) != 0)
uError("post %s emptySem failed(%s)", pSched->label, strerror(errno));
if ((ret = tsem_post(&pSched->emptySem)) != 0) {
uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (msg.fp)
(*(msg.fp))(&msg);
......@@ -151,30 +159,37 @@ void *taosProcessSchedQueue(void *param) {
return NULL;
}
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)qhandle;
void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)queueScheduler;
int ret = 0;
if (pSched == NULL) {
uError("sched is not ready, msg:%p is dropped", pMsg);
return 0;
return;
}
if (tsem_wait(&pSched->emptySem) != 0) {
uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
pSched->queue[pSched->emptySlot] = *pMsg;
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if (tsem_post(&pSched->fullSem) != 0)
uError("post %s fullSem failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
return 0;
if ((ret = tsem_post(&pSched->fullSem)) != 0) {
uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
}
void taosCleanUpScheduler(void *param) {
......@@ -219,4 +234,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
}
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}
}
\ No newline at end of file
......@@ -235,6 +235,8 @@ python3 ./test.py -f query/queryTscomputWithNow.py
python3 ./test.py -f query/computeErrorinWhere.py
python3 ./test.py -f query/queryTsisNull.py
python3 ./test.py -f query/subqueryFilter.py
# python3 ./test.py -f query/nestedQuery/queryInterval.py
python3 ./test.py -f query/queryStateWindow.py
#stream
......@@ -325,6 +327,7 @@ python3 ./test.py -f query/queryGroupbySort.py
python3 ./test.py -f functions/queryTestCases.py
python3 ./test.py -f functions/function_stateWindow.py
python3 ./test.py -f functions/function_derivative.py
python3 ./test.py -f functions/function_irate.py
python3 ./test.py -f insert/unsignedInt.py
python3 ./test.py -f insert/unsignedBigint.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.rowNum = 100
self.ts = 1537146000000
self.ts1 = 1537146000000000
def run(self):
# db precison ms
tdSql.prepare()
tdSql.execute('''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20), tag1 int)''')
tdSql.execute("create table test1 using test tags('beijing', 10)")
tdSql.execute("create table gtest1 (ts timestamp, col1 float)")
tdSql.execute("create table gtest2 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest3 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest4 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest5 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest6 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest7 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest8 (ts timestamp, col1 tinyint)")
for i in range(self.rowNum):
tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i*1000, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
tdSql.execute("insert into gtest1 values(1537146000000,0);")
tdSql.execute("insert into gtest1 values(1537146001100,1.2);")
tdSql.execute("insert into gtest2 values(1537146001001,1);")
tdSql.execute("insert into gtest2 values(1537146001101,2);")
tdSql.execute("insert into gtest3 values(1537146001101,2);")
tdSql.execute("insert into gtest4(ts) values(1537146001101);")
tdSql.execute("insert into gtest5 values(1537146001002,4);")
tdSql.execute("insert into gtest5 values(1537146002202,4);")
tdSql.execute("insert into gtest6 values(1537146000000,5);")
tdSql.execute("insert into gtest6 values(1537146001000,2);")
tdSql.execute("insert into gtest7 values(1537146001000,1);")
tdSql.execute("insert into gtest7 values(1537146008000,2);")
tdSql.execute("insert into gtest7 values(1537146009000,6);")
tdSql.execute("insert into gtest7 values(1537146012000,3);")
tdSql.execute("insert into gtest7 values(1537146015000,3);")
tdSql.execute("insert into gtest7 values(1537146017000,1);")
tdSql.execute("insert into gtest7 values(1537146019000,3);")
tdSql.execute("insert into gtest8 values(1537146000002,4);")
tdSql.execute("insert into gtest8 values(1537146002202,4);")
# irate verifacation
tdSql.query("select irate(col1) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col1) from test1 interval(10s);")
tdSql.checkData(0, 1, 1)
tdSql.query("select irate(col1) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col2) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col3) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col4) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col5) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col6) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col11) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col12) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col13) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col14) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col2) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col2) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col1) from gtest1;")
tdSql.checkData(0, 0, 1.2/1.1)
tdSql.query("select irate(col1) from gtest2;")
tdSql.checkData(0, 0, 10)
tdSql.query("select irate(col1) from gtest3;")
tdSql.checkData(0, 0, 0)
tdSql.query("select irate(col1) from gtest4;")
tdSql.checkRows(0)
tdSql.query("select irate(col1) from gtest5;")
tdSql.checkData(0, 0, 0)
tdSql.query("select irate(col1) from gtest6;")
tdSql.checkData(0, 0, 2)
tdSql.query("select irate(col1) from gtest7;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col1) from gtest7 interval(5s) order by ts asc;")
tdSql.checkData(1, 1, 4)
tdSql.checkData(2, 1, 0)
tdSql.checkData(3, 1, 1)
tdSql.query("select irate(col1) from gtest7 interval(5s) order by ts desc ;")
tdSql.checkData(1, 1, 0)
tdSql.checkData(2, 1, 4)
tdSql.checkData(3, 1, 0)
#error
tdSql.error("select irate(col1) from test")
tdSql.error("select irate(ts) from test1")
tdSql.error("select irate(col7) from test1")
tdSql.error("select irate(col8) from test1")
tdSql.error("select irate(col9) from test1")
tdSql.error("select irate(loc) from test1")
tdSql.error("select irate(tag1) from test1")
# use db1 precision us
tdSql.execute("create database db1 precision 'us' keep 3650 UPDATE 1")
tdSql.execute("use db1 ")
tdSql.execute('''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''')
tdSql.execute("create table test1 using test tags('beijing')")
tdSql.execute("create table gtest1 (ts timestamp, col1 float)")
tdSql.execute("create table gtest2 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest3 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest4 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest5 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest6 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest7 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest8 (ts timestamp, col1 tinyint)")
tdSql.execute("create table gtest9 (ts timestamp, col1 tinyint)")
for i in range(self.rowNum):
tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts1 + i*1000000, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
tdSql.execute("insert into gtest1 values(1537146000000000,0);")
tdSql.execute("insert into gtest1 values(1537146001100000,1.2);")
tdSql.execute("insert into gtest2 values(1537146001001000,1);")
tdSql.execute("insert into gtest2 values(1537146001101000,2);")
tdSql.execute("insert into gtest3 values(1537146001101000,2);")
tdSql.execute("insert into gtest4(ts) values(1537146001101000);")
tdSql.execute("insert into gtest5 values(1537146001002000,4);")
tdSql.execute("insert into gtest5 values(1537146002202000,4);")
tdSql.execute("insert into gtest6 values(1537146000000000,5);")
tdSql.execute("insert into gtest6 values(1537146001000000,2);")
tdSql.execute("insert into gtest7 values(1537146001000000,1);")
tdSql.execute("insert into gtest7 values(1537146008000000,2);")
tdSql.execute("insert into gtest7 values(1537146009000000,6);")
tdSql.execute("insert into gtest7 values(1537146012000000,3);")
tdSql.execute("insert into gtest7 values(1537146015000000,3);")
tdSql.execute("insert into gtest7 values(1537146017000000,1);")
tdSql.execute("insert into gtest7 values(1537146019000000,3);")
tdSql.execute("insert into gtest8 values(1537146000002000,3);")
tdSql.execute("insert into gtest8 values(1537146001003000,4);")
tdSql.execute("insert into gtest9 values(1537146000000000,4);")
tdSql.execute("insert into gtest9 values(1537146000000001,5);")
# irate verifacation
tdSql.query("select irate(col1) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col1) from test1 interval(10s);")
tdSql.checkData(0, 1, 1)
tdSql.query("select irate(col1) from test1;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col1) from gtest1;")
tdSql.checkData(0, 0, 1.2/1.1)
tdSql.query("select irate(col1) from gtest2;")
tdSql.checkData(0, 0, 10)
tdSql.query("select irate(col1) from gtest3;")
tdSql.checkData(0, 0, 0)
tdSql.query("select irate(col1) from gtest4;")
tdSql.checkRows(0)
tdSql.query("select irate(col1) from gtest5;")
tdSql.checkData(0, 0, 0)
tdSql.query("select irate(col1) from gtest6;")
tdSql.checkData(0, 0, 2)
tdSql.query("select irate(col1) from gtest7;")
tdSql.checkData(0, 0, 1)
tdSql.query("select irate(col1) from gtest7 interval(5s) order by ts asc;")
tdSql.checkData(1, 1, 4)
tdSql.checkData(2, 1, 0)
tdSql.checkData(3, 1, 1)
tdSql.query("select irate(col1) from gtest7 interval(5s) order by ts desc ;")
tdSql.checkData(1, 1, 0)
tdSql.checkData(2, 1, 4)
tdSql.checkData(3, 1, 0)
tdSql.query("select irate(col1) from gtest8;")
tdSql.checkData(0, 0, 1/1.001)
tdSql.query("select irate(col1) from gtest9;")
tdSql.checkData(0, 0, 1000000)
#error
tdSql.error("select irate(col1) from test")
tdSql.error("select irate(ts) from test1")
tdSql.error("select irate(col7) from test1")
tdSql.error("select irate(col8) from test1")
tdSql.error("select irate(col9) from test1")
tdSql.error("select irate(loc) from test1")
tdSql.error("select irate(tag1) from test1")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
此差异已折叠。
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file":"./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"interlace_rows": 10,
"num_of_records_per_req": 1000,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 50,
"blocks": 8,
"precision": "ms",
"keep": 365,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb0",
"child_table_exists":"no",
"childtable_count": 1,
"childtable_prefix": "stb0_",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 100000,
"childtable_limit": -1,
"childtable_offset": 0,
"multi_thread_write_one_tbl": "no",
"interlace_rows": 0,
"insert_interval": 0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1,
"timestamp_step": 1000,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./tools/taosdemoAllTest/sample.csv",
"tags_file": "",
"columns": [{"type": "INT", "count":1}, {"type": "BINARY", "len": 16, "count":1}, {"type": "BOOL"}],
"tags": [{"type": "TINYINT", "count":1}, {"type": "BINARY", "len": 16, "count":1}]
}]
}]
}
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
# insert: create one or mutiple tables per sql and insert multiple rows per sql
os.system("%staosdemo -f query/nestedQuery/insertData.json -y " % binPath)
tdSql.execute("use db")
tdSql.query("select count (tbname) from stb0")
tdSql.checkData(0, 0, 1000)
tdSql.query("select count (tbname) from stb1")
tdSql.checkData(0, 0, 1000)
tdSql.query("select count(*) from stb00_0")
tdSql.checkData(0, 0, 100)
tdSql.query("select count(*) from stb0")
tdSql.checkData(0, 0, 100000)
tdSql.query("select count(*) from stb01_1")
tdSql.checkData(0, 0, 200)
tdSql.query("select count(*) from stb1")
tdSql.checkData(0, 0, 200000)
testcaseFilename = os.path.split(__file__)[-1]
os.system("rm -rf ./insert_res.txt")
os.system("rm -rf query/nestedQuery/%s.sql" % testcaseFilename )
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
此差异已折叠。
此差异已折叠。
......@@ -45,7 +45,9 @@ class TDTestCase:
for i in range(100):
sql += "(%d, %d, 'nchar%d')" % (currts + i, i % 100, i % 100)
tdSql.execute(sql)
os.system("rm /tmp/*.sql")
os.system("taosdump --databases db -o /tmp")
tdSql.execute("drop database db")
......
此差异已折叠。
此差异已折叠。
......@@ -4,3 +4,5 @@ Cur_Dir=$(pwd)
echo $Cur_Dir
echo "'2020-1-1 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql
echo "'2020-1-2 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql
echo "'2020-1-3 1:1:1','abc','device',123,'9876', 'abc', 'net', 'mno', 'province', 'city', 'al'" >> ~/data.sql
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
system sh/exec.sh -n dnode1 -s start
......
......@@ -63,4 +63,3 @@ run general/parser/between_and.sim
run general/parser/last_cache.sim
run general/parser/nestquery.sim
run general/parser/precision_ns.sim
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册