提交 af15769d 编写于 作者: L liuyq-617

Merge branch 'CI/TD-12111' of github.com:taosdata/TDengine into CI/TD-12111

...@@ -53,6 +53,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传 ...@@ -53,6 +53,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传
| 8 | TINYINT | 1 | 单字节整型,范围 [-127, 127], -128 用于 NULL | | 8 | TINYINT | 1 | 单字节整型,范围 [-127, 127], -128 用于 NULL |
| 9 | BOOL | 1 | 布尔型,{true, false} | | 9 | BOOL | 1 | 布尔型,{true, false} |
| 10 | NCHAR | 自定义 | 记录包含多字节字符在内的字符串,如中文字符。每个 nchar 字符占用 4 bytes 的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 `\’`。nchar 使用时须指定字符串大小,类型为 nchar(10) 的列表示此列的字符串最多存储 10 个 nchar 字符,会固定占用 40 bytes 的空间。如果用户字符串长度超出声明长度,将会报错。 | | 10 | NCHAR | 自定义 | 记录包含多字节字符在内的字符串,如中文字符。每个 nchar 字符占用 4 bytes 的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 `\’`。nchar 使用时须指定字符串大小,类型为 nchar(10) 的列表示此列的字符串最多存储 10 个 nchar 字符,会固定占用 40 bytes 的空间。如果用户字符串长度超出声明长度,将会报错。 |
| 11 | JSON | | json数据类型, 只有tag类型可以是json格式 |
<!-- REPLACE_OPEN_TO_ENTERPRISE__COLUMN_TYPE_ADDONS --> <!-- REPLACE_OPEN_TO_ENTERPRISE__COLUMN_TYPE_ADDONS -->
**Tips**: **Tips**:
...@@ -1603,6 +1604,15 @@ TAOS SQL 支持对标签、TBNAME 进行 GROUP BY 操作,也支持普通列进 ...@@ -1603,6 +1604,15 @@ TAOS SQL 支持对标签、TBNAME 进行 GROUP BY 操作,也支持普通列进
IS NOT NULL 支持所有类型的列。不为空的表达式为 <>"",仅对非数值类型的列适用。 IS NOT NULL 支持所有类型的列。不为空的表达式为 <>"",仅对非数值类型的列适用。
**ORDER BY的限制**
- 非超级表只能有一个order by.
- 超级表最多两个order by, 并且第二个必须为ts.
- order by tag,必须和group by tag一起,并且是同一个tag。 tbname和tag一样逻辑。 只适用于超级表
- order by 普通列,必须和group by一起或者和top/bottom一起,并且是同一个普通列。 适用于超级表和普通表。如果同时存在 group by和 top/bottom一起,order by优先必须和group by同一列。
- order by ts. 适用于超级表和普通表。
- order by ts同时含有group by时 针对group内部用ts排序
## 表(列)名合法性说明 ## 表(列)名合法性说明
TDengine 中的表(列)名命名规则如下: TDengine 中的表(列)名命名规则如下:
只能由字母、数字、下划线构成,数字不能在首位,长度不能超过192字节,不区分大小写。 只能由字母、数字、下划线构成,数字不能在首位,长度不能超过192字节,不区分大小写。
...@@ -1618,3 +1628,87 @@ TDengine 中的表(列)名命名规则如下: ...@@ -1618,3 +1628,87 @@ TDengine 中的表(列)名命名规则如下:
支持版本 支持版本
支持转义符的功能从 2.3.0.1 版本开始。 支持转义符的功能从 2.3.0.1 版本开始。
## Json类型使用说明
- 语法说明
1. 创建json类型tag
```mysql
create stable s1 (ts timestamp, v1 int) tags (info json)
create table s1_1 using s1 tags ('{"k1": "v1"}')
```
3. json取值操作符 ->
```mysql
select * from s1 where info->'k1' = 'v1'
select info->'k1' from s1
```
4. json key是否存在操作符 contains
```mysql
select * from s1 where info contains 'k2'
select * from s1 where info contains 'k1'
```
- 支持的操作
1. 在where条件中时,支持函数match/nmatch/between and/like/and/or/is null/is no null,不支持in
```mysql
select * from s1 where info→'k1' match 'v*';
select * from s1 where info→'k1' like 'v%' and info contains 'k2';
select * from s1 where info is null;
select * from s1 where info->'k1' is not null
```
2. 支持json tag放在group by、order by、join子句、union all以及子查询中,比如group by json->'key'
3. 支持distinct操作.
```mysql
select distinct info→'k1' from s1
```
5. 标签操作
支持修改json标签值(全量覆盖)
支持修改json标签名
不支持添加json标签、删除json标签、修改json标签列宽
- 其他约束条件
1. 只有标签列可以使用json类型,如果用json标签,标签列只能有一个。
2. 长度限制:json 中key的长度不能超过256,并且key必须为可打印ascii字符;json字符串总长度不超过4096个字节。
3. json格式限制:
1. json输入字符串可以为空("","\t"," "或null)或object,不能为非空的字符串,布尔型和数组。
2. object 可为{},如果object为{},则整个json串记为空。key可为"",若key为"",则json串中忽略该k-v对。
3. value可以为数字(int/double)或字符串或bool或null,暂不可以为数组。不允许嵌套。
4. 若json字符串中出现两个相同的key,则第一个生效。
5. json字符串里暂不支持转义。
4. 当查询json中不存在的key时,返回NULL
5. 当json tag作为子查询结果时,不再支持上层查询继续对子查询中的json串做解析查询。
比如暂不支持
```mysql
select jtag→'key' from (select jtag from stable)
```
不支持
```mysql
select jtag->'key' from (select jtag from stable) where jtag->'key'>0
```
...@@ -53,7 +53,7 @@ In TDengine, the following 10 data types can be used in data model of an ordinar ...@@ -53,7 +53,7 @@ In TDengine, the following 10 data types can be used in data model of an ordinar
| 8 | TINYINT | 1 | A nullable integer type with a range of [-127, 127] | | 8 | TINYINT | 1 | A nullable integer type with a range of [-127, 127] |
| 9 | BOOL | 1 | Boolean type,{true, false} | | 9 | BOOL | 1 | Boolean type,{true, false} |
| 10 | NCHAR | Custom | Used to record non-ASCII strings, such as Chinese characters. Each nchar character takes up 4 bytes of storage space. Single quotation marks are used at both ends of the string, and escape characters are required for single quotation marks in the string, that is \’. When nchar is used, the string size must be specified. A column of type nchar (10) indicates that the string of this column stores up to 10 nchar characters, which will take up 40 bytes of space. If the length of the user string exceeds the declared length, an error will be reported. | | 10 | NCHAR | Custom | Used to record non-ASCII strings, such as Chinese characters. Each nchar character takes up 4 bytes of storage space. Single quotation marks are used at both ends of the string, and escape characters are required for single quotation marks in the string, that is \’. When nchar is used, the string size must be specified. A column of type nchar (10) indicates that the string of this column stores up to 10 nchar characters, which will take up 40 bytes of space. If the length of the user string exceeds the declared length, an error will be reported. |
| 11 | JSON | | Json type,only support for tag |
**Tips**: **Tips**:
...@@ -1245,3 +1245,92 @@ TAOS SQL supports join columns of two tables by Primary Key timestamp between th ...@@ -1245,3 +1245,92 @@ TAOS SQL supports join columns of two tables by Primary Key timestamp between th
**Availability of is no null** **Availability of is no null**
Is not null supports all types of columns. Non-null expression is < > "" and only applies to columns of non-numeric types. Is not null supports all types of columns. Non-null expression is < > "" and only applies to columns of non-numeric types.
**Restrictions on order by**
- A non super table can only have one order by.
- The super table can have at most two order by expression, and the second must be ts.
- Order by tag must be the same tag as group by tag. TBNAME is as logical as tag.
- Order by ordinary column must be the same ordinary column as group by or top/bottom. If both group by and top / bottom exist, order by must be in the same column as group by.
- There are both order by and group by. The internal of the group is sorted by ts
- Order by ts.
## JSON type instructions
- Syntax description
1. Create JSON type tag
```mysql
create stable s1 (ts timestamp, v1 int) tags (info json)
create table s1_1 using s1 tags ('{"k1": "v1"}')
```
3. JSON value operator(->)
```mysql
select * from s1 where info->'k1' = 'v1'
select info->'k1' from s1
```
4. JSON key existence operator(contains)
```mysql
select * from s1 where info contains 'k2'
select * from s1 where info contains 'k1'
```
- Supported operations
1. In where condition,support match/nmatch/between and/like/and/or/is null/is no null,in operator is not support.
```mysql
select * from s1 where info→'k1' match 'v*';
select * from s1 where info→'k1' like 'v%' and info contains 'k2';
select * from s1 where info is null;
select * from s1 where info->'k1' is not null
```
2. JSON tag is supported in group by、order by、join clause、union all and subquery,like group by json->'key'
3. Support distinct operator.
```mysql
select distinct info→'k1' from s1
```
5. Tag
Support change JSON tag(full coverage)
Support change the name of JSON tag
Not support add JSON tag, delete JSON tag
- Other constraints
1. Only tag columns can use JSON type. If JSON tag is used, there can only be one tag column.
2. Length limit:The length of the key in JSON cannot exceed 256, and the key must be printable ASCII characters; The total length of JSON string does not exceed 4096 bytes.
3. JSON format restrictions:
1. JSON input string can be empty (""," ","\t" or null) or object, and cannot be nonempty string, boolean or array.
2. Object can be {}, if the object is {}, the whole JSON string is marked as empty. The key can be "", if the key is "", the K-V pair will be ignored in the JSON string.
3. Value can be a number (int/double) or string, bool or null, not an array. Nesting is not allowed.
4. If two identical keys appear in the JSON string, the first one will take effect.
5. Escape is not supported in JSON string.
4. Null is returned when querying the key that does not exist in JSON.
5. When JSON tag is used as the sub query result, parsing and querying the JSON string in the sub query is no longer supported in the upper level query.
The following query is not supported:
```mysql
select jtag→'key' from (select jtag from stable)
select jtag->'key' from (select jtag from stable) where jtag->'key'>0
```
...@@ -494,11 +494,13 @@ else ...@@ -494,11 +494,13 @@ else
exit 1 exit 1
fi fi
CORES=`grep -c ^processor /proc/cpuinfo`
if [[ "$allocator" == "jemalloc" ]]; then if [[ "$allocator" == "jemalloc" ]]; then
# jemalloc need compile first, so disable parallel build # jemalloc need compile first, so disable parallel build
make -j 8 && ${csudo}make install make -j ${CORES} && ${csudo}make install
else else
make -j 8 && ${csudo}make install make -j ${CORES} && ${csudo}make install
fi fi
cd ${curr_dir} cd ${curr_dir}
......
...@@ -9,3 +9,8 @@ INCLUDE_DIRECTORIES(inc) ...@@ -9,3 +9,8 @@ INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(balance ${SRC}) ADD_LIBRARY(balance ${SRC})
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(balance jemalloc)
ENDIF ()
...@@ -5186,7 +5186,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -5186,7 +5186,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
} }
// global aggregate query // global aggregate query
if (pQueryAttr->stableQuery && (pQueryAttr->simpleAgg || pQueryAttr->interval.interval > 0) && tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (pQueryAttr->stableQuery && (pQueryAttr->simpleAgg || pQueryAttr->interval.interval > 0 || pQueryAttr->sw.gap > 0)
&& tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
createGlobalAggregateExpr(pQueryAttr, pQueryInfo); createGlobalAggregateExpr(pQueryAttr, pQueryInfo);
} }
......
...@@ -41,6 +41,7 @@ cursor.execute("show databases") ...@@ -41,6 +41,7 @@ cursor.execute("show databases")
results = cursor.fetchall() results = cursor.fetchall()
for row in results: for row in results:
print(row) print(row)
cursor.close() cursor.close()
conn.close() conn.close()
``` ```
...@@ -57,8 +58,10 @@ result = conn.query("show databases") ...@@ -57,8 +58,10 @@ result = conn.query("show databases")
num_of_fields = result.field_count num_of_fields = result.field_count
for field in result.fields: for field in result.fields:
print(field) print(field)
for row in result: for row in result:
print(row) print(row)
result.close() result.close()
conn.execute("drop database pytest") conn.execute("drop database pytest")
conn.close() conn.close()
...@@ -75,12 +78,13 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -75,12 +78,13 @@ def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows") print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter)) p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result) result = TaosResult(p_result)
if num_of_rows == 0: if num_of_rows == 0:
print("fetching completed") print("fetching completed")
p.contents.done = True p.contents.done = True
result.close() result.close()
return return
if num_of_rows < 0: if num_of_rows < 0:
p.contents.done = True p.contents.done = True
result.check_error(num_of_rows) result.check_error(num_of_rows)
...@@ -90,6 +94,7 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -90,6 +94,7 @@ def fetch_callback(p_param, p_result, num_of_rows):
for row in result.rows_iter(num_of_rows): for row in result.rows_iter(num_of_rows):
# print(row) # print(row)
None None
p.contents.count += result.row_count p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param) result.fetch_rows_a(fetch_callback, p_param)
...@@ -97,17 +102,19 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -97,17 +102,19 @@ def fetch_callback(p_param, p_result, num_of_rows):
def query_callback(p_param, p_result, code): def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_int) -> None
if p_result == None: if p_result is None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
if code == 0: if code == 0:
result.fetch_rows_a(fetch_callback, p_param) result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code) result.check_error(code)
class Counter(Structure): class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)] _fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self): def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done) return "{ count: %d, done: %s }" % (self.count, self.done)
...@@ -116,10 +123,11 @@ def test_query(conn): ...@@ -116,10 +123,11 @@ def test_query(conn):
# type: (TaosConnection) -> None # type: (TaosConnection) -> None
counter = Counter(count=0) counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter)) conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done: while not counter.done:
print("wait query callback") print("wait query callback")
time.sleep(1) time.sleep(1)
print(counter) print(counter)
conn.close() conn.close()
...@@ -182,6 +190,7 @@ result = conn.query("select * from log") ...@@ -182,6 +190,7 @@ result = conn.query("select * from log")
for row in result: for row in result:
print(row) print(row)
result.close() result.close()
stmt.close() stmt.close()
conn.close() conn.close()
...@@ -237,18 +246,20 @@ result.close() ...@@ -237,18 +246,20 @@ result.close()
result = conn.query("select * from log") result = conn.query("select * from log")
for row in result: for row in result:
print(row) print(row)
result.close() result.close()
stmt.close() stmt.close()
conn.close() conn.close()
``` ```
### Statement API - Subscribe ### Subscription
```python ```python
import taos import taos
import random
conn = taos.connect() conn = taos.connect()
dbname = "pytest_taos_subscribe_callback" dbname = "pytest_taos_subscribe"
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname) conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname) conn.select_db(dbname)
...@@ -256,7 +267,7 @@ conn.execute("create table if not exists log(ts timestamp, n int)") ...@@ -256,7 +267,7 @@ conn.execute("create table if not exists log(ts timestamp, n int)")
for i in range(10): for i in range(10):
conn.execute("insert into log values(now, %d)" % i) conn.execute("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000) sub = conn.subscribe(False, "test", "select * from log", 1000)
print("# consume from begin") print("# consume from begin")
for ts, n in sub.consume(): for ts, n in sub.consume():
print(ts, n) print(ts, n)
...@@ -268,9 +279,18 @@ for i in range(5): ...@@ -268,9 +279,18 @@ for i in range(5):
for ts, n in result: for ts, n in result:
print(ts, n) print(ts, n)
sub.close(True)
print("# keep progress consume")
sub = conn.subscribe(False, "test", "select * from log", 1000)
result = sub.consume()
rows = result.fetch_all()
# consume from latest subscription needs root privilege(for /var/lib/taos).
assert result.row_count == 0
print("## consumed ", len(rows), "rows")
print("# consume with a stop condition") print("# consume with a stop condition")
for i in range(10): for i in range(10):
conn.execute("insert into log values(now, %d)" % int(random() * 10)) conn.execute("insert into log values(now, %d)" % random.randint(0, 10))
result = sub.consume() result = sub.consume()
try: try:
ts, n = next(result) ts, n = next(result)
...@@ -283,12 +303,13 @@ for i in range(10): ...@@ -283,12 +303,13 @@ for i in range(10):
continue continue
sub.close() sub.close()
# sub.close()
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close() # conn.close()
``` ```
### Statement API - Subscribe asynchronously with callback ### Subscription asynchronously with callback
```python ```python
from taos import * from taos import *
...@@ -300,7 +321,7 @@ import time ...@@ -300,7 +321,7 @@ import time
def subscribe_callback(p_sub, p_result, p_param, errno): def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback") print("# fetch in callback")
result = TaosResult(p_result) result = TaosResult(c_void_p(p_result))
result.check_error(errno) result.check_error(errno)
for row in result.rows_iter(): for row in result.rows_iter():
ts, n = row() ts, n = row()
...@@ -311,42 +332,45 @@ def test_subscribe_callback(conn): ...@@ -311,42 +332,45 @@ def test_subscribe_callback(conn):
# type: (TaosConnection) -> None # type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback" dbname = "pytest_taos_subscribe_callback"
try: try:
print("drop if exists")
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
print("create database")
conn.execute("create database if not exists %s" % dbname) conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname) print("create table")
conn.execute("create table if not exists log(ts timestamp, n int)") # conn.execute("use %s" % dbname)
conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname)
print("# subscribe with callback") print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback) sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback)
for i in range(10): for i in range(10):
conn.execute("insert into log values(now, %d)" % i) conn.execute("insert into %s.log values(now, %d)" % (dbname, i))
time.sleep(0.7) time.sleep(0.7)
sub.close() sub.close()
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close() # conn.close()
except Exception as err: except Exception as err:
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close() # conn.close()
raise err raise err
if __name__ == "__main__": if __name__ == "__main__":
test_subscribe_callback(connect()) test_subscribe_callback(connect())
``` ```
### Statement API - Stream ### Stream
```python ```python
from taos import * from taos import *
from ctypes import * from ctypes import *
import time
def stream_callback(p_param, p_result, p_row): def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None # type: (c_void_p, c_void_p, c_void_p) -> None
if p_result is None or p_row is None:
if p_result == None or p_row == None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
row = TaosRow(result, p_row) row = TaosRow(result, p_row)
...@@ -355,13 +379,12 @@ def stream_callback(p_param, p_result, p_row): ...@@ -355,13 +379,12 @@ def stream_callback(p_param, p_result, p_row):
p = cast(p_param, POINTER(Counter)) p = cast(p_param, POINTER(Counter))
p.contents.count += count p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count)) print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err: except Exception as err:
print(err) print(err)
raise err raise err
class Counter(ctypes.Structure): class Counter(Structure):
_fields_ = [ _fields_ = [
("count", c_int), ("count", c_int),
] ]
...@@ -378,16 +401,17 @@ def test_stream(conn): ...@@ -378,16 +401,17 @@ def test_stream(conn):
conn.execute("create database if not exists %s" % dbname) conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname) conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)") conn.execute("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)") result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2 assert result.field_count == 2
counter = Counter() counter = Counter()
counter.count = 0 counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter)) stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20): for _ in range(0, 20):
conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)") conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2) time.sleep(2)
stream.close() stream.close()
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close() conn.close()
...@@ -399,12 +423,14 @@ def test_stream(conn): ...@@ -399,12 +423,14 @@ def test_stream(conn):
if __name__ == "__main__": if __name__ == "__main__":
test_stream(connect()) test_stream(connect())
``` ```
### Insert with line protocol ### Insert with line protocol
```python ```python
import taos import taos
from taos import SmlProtocol, SmlPrecision
conn = taos.connect() conn = taos.connect()
dbname = "pytest_line" dbname = "pytest_line"
...@@ -413,29 +439,22 @@ conn.execute("create database if not exists %s precision 'us'" % dbname) ...@@ -413,29 +439,22 @@ conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname) conn.select_db(dbname)
lines = [ lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns', 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
] ]
conn.schemaless_insert(lines, 0, "ns") conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)
print("inserted") print("inserted")
lines = [ conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
conn.schemaless_insert(lines, 0, "ns")
result = conn.query("show tables") result = conn.query("show tables")
for row in result: for row in result:
print(row) print(row)
result.close()
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close()
``` ```
## License - AGPL-3.0 ## License
Keep same with [TDengine](https://github.com/taosdata/TDengine). We use MIT license for Python connector.
...@@ -29,7 +29,7 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -29,7 +29,7 @@ def fetch_callback(p_param, p_result, num_of_rows):
def query_callback(p_param, p_result, code): def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_int) -> None
if p_result == None: if p_result is None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
if code == 0: if code == 0:
......
from taos import *
from ctypes import *
import time
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result is None or p_row is None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
...@@ -86,7 +86,7 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -86,7 +86,7 @@ def fetch_callback(p_param, p_result, num_of_rows):
def query_callback(p_param, p_result, code): def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_int) -> None
if p_result == None: if p_result is None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
if code == 0: if code == 0:
...@@ -335,7 +335,7 @@ from ctypes import * ...@@ -335,7 +335,7 @@ from ctypes import *
def stream_callback(p_param, p_result, p_row): def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None # type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None: if p_result is None or p_row is None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
row = TaosRow(result, p_row) row = TaosRow(result, p_row)
......
...@@ -317,7 +317,7 @@ class TaosMultiBind(ctypes.Structure): ...@@ -317,7 +317,7 @@ class TaosMultiBind(ctypes.Structure):
def _str_to_buffer(self, values): def _str_to_buffer(self, values):
self.num = len(values) self.num = len(values)
is_null = [1 if v == None else 0 for v in values] is_null = [1 if v is None else 0 for v in values]
self.is_null = cast((c_byte * self.num)(*is_null), c_char_p) self.is_null = cast((c_byte * self.num)(*is_null), c_char_p)
if sum(is_null) == self.num: if sum(is_null) == self.num:
......
...@@ -373,9 +373,9 @@ def taos_fetch_block(result, fields=None, field_count=None): ...@@ -373,9 +373,9 @@ def taos_fetch_block(result, fields=None, field_count=None):
if num_of_rows == 0: if num_of_rows == 0:
return None, 0 return None, 0
precision = taos_result_precision(result) precision = taos_result_precision(result)
if fields == None: if fields is None:
fields = taos_fetch_fields(result) fields = taos_fetch_fields(result)
if field_count == None: if field_count is None:
field_count = taos_field_count(result) field_count = taos_field_count(result)
blocks = [None] * field_count blocks = [None] * field_count
fieldLen = taos_fetch_lengths(result, field_count) fieldLen = taos_fetch_lengths(result, field_count)
...@@ -466,7 +466,7 @@ def taos_fetch_lengths(result, field_count=None): ...@@ -466,7 +466,7 @@ def taos_fetch_lengths(result, field_count=None):
# type: (c_void_p, int) -> Array[int] # type: (c_void_p, int) -> Array[int]
"""Make sure to call taos_fetch_row or taos_fetch_block before fetch_lengths""" """Make sure to call taos_fetch_row or taos_fetch_block before fetch_lengths"""
lens = _libtaos.taos_fetch_lengths(result) lens = _libtaos.taos_fetch_lengths(result)
if field_count == None: if field_count is None:
field_count = taos_field_count(result) field_count = taos_field_count(result)
if not lens: if not lens:
raise OperationalError("field length empty, use taos_fetch_row/block before it") raise OperationalError("field length empty, use taos_fetch_row/block before it")
...@@ -823,7 +823,7 @@ def taos_stmt_use_result(stmt): ...@@ -823,7 +823,7 @@ def taos_stmt_use_result(stmt):
@stmt: TAOS_STMT* @stmt: TAOS_STMT*
""" """
result = c_void_p(_libtaos.taos_stmt_use_result(stmt)) result = c_void_p(_libtaos.taos_stmt_use_result(stmt))
if result == None: if result is None:
raise StatementError(taos_stmt_errstr(stmt)) raise StatementError(taos_stmt_errstr(stmt))
return result return result
......
...@@ -41,7 +41,7 @@ class TaosResult(object): ...@@ -41,7 +41,7 @@ class TaosResult(object):
if self._result is None or self.fields is None: if self._result is None or self.fields is None:
raise OperationalError("Invalid use of fetch iterator") raise OperationalError("Invalid use of fetch iterator")
if self._block == None or self._block_iter >= self._block_length: if self._block is None or self._block_iter >= self._block_length:
self._block, self._block_length = self.fetch_block() self._block, self._block_length = self.fetch_block()
self._block_iter = 0 self._block_iter = 0
# self._row_count += self._block_length # self._row_count += self._block_length
...@@ -55,7 +55,7 @@ class TaosResult(object): ...@@ -55,7 +55,7 @@ class TaosResult(object):
"""fields definitions of the current result""" """fields definitions of the current result"""
if self._result is None: if self._result is None:
raise ResultError("no result object setted") raise ResultError("no result object setted")
if self._fields == None: if self._fields is None:
self._fields = taos_fetch_fields(self._result) self._fields = taos_fetch_fields(self._result)
return self._fields return self._fields
...@@ -72,7 +72,7 @@ class TaosResult(object): ...@@ -72,7 +72,7 @@ class TaosResult(object):
@property @property
def precision(self): def precision(self):
if self._precision == None: if self._precision is None:
self._precision = taos_result_precision(self._result) self._precision = taos_result_precision(self._result)
return self._precision return self._precision
...@@ -114,7 +114,7 @@ class TaosResult(object): ...@@ -114,7 +114,7 @@ class TaosResult(object):
if self._result is None: if self._result is None:
raise OperationalError("Invalid use of fetchall") raise OperationalError("Invalid use of fetchall")
if self._fields == None: if self._fields is None:
self._fields = taos_fetch_fields(self._result) self._fields = taos_fetch_fields(self._result)
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(self._fields))]
self._row_count = 0 self._row_count = 0
...@@ -150,7 +150,7 @@ class TaosResult(object): ...@@ -150,7 +150,7 @@ class TaosResult(object):
return taos_errstr(self._result) return taos_errstr(self._result)
def check_error(self, errno=None, close=True): def check_error(self, errno=None, close=True):
if errno == None: if errno is None:
errno = self.errno() errno = self.errno()
if errno != 0: if errno != 0:
msg = self.errstr() msg = self.errstr()
......
...@@ -32,6 +32,10 @@ ELSEIF (TD_WINDOWS) ...@@ -32,6 +32,10 @@ ELSEIF (TD_WINDOWS)
LIST(APPEND SRC ./src/shellMain.c) LIST(APPEND SRC ./src/shellMain.c)
LIST(APPEND SRC ./src/shellWindows.c) LIST(APPEND SRC ./src/shellWindows.c)
ADD_EXECUTABLE(shell ${SRC}) ADD_EXECUTABLE(shell ${SRC})
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(shell jemalloc)
ENDIF ()
TARGET_LINK_LIBRARIES(shell taos_static cJson) TARGET_LINK_LIBRARIES(shell taos_static cJson)
IF (TD_POWER) IF (TD_POWER)
......
Subproject commit beca4813316f254624d8dbecf54d45a5a232c61d Subproject commit a3611888d4257a9baa0ce876b04b47c60cc17279
...@@ -8,3 +8,8 @@ INCLUDE_DIRECTORIES(inc) ...@@ -8,3 +8,8 @@ INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(mnode ${SRC}) ADD_LIBRARY(mnode ${SRC})
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(mnode jemalloc)
ENDIF ()
...@@ -11,6 +11,11 @@ ADD_LIBRARY(os ${SRC}) ...@@ -11,6 +11,11 @@ ADD_LIBRARY(os ${SRC})
IF (TD_LINUX) IF (TD_LINUX)
TARGET_LINK_LIBRARIES(os oslinux) TARGET_LINK_LIBRARIES(os oslinux)
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(os jemalloc)
ENDIF ()
IF (TD_ARM_32 OR TD_LINUX_32) IF (TD_ARM_32 OR TD_LINUX_32)
TARGET_LINK_LIBRARIES(os atomic) TARGET_LINK_LIBRARIES(os atomic)
ENDIF () ENDIF ()
......
...@@ -4,4 +4,9 @@ PROJECT(TDengine) ...@@ -4,4 +4,9 @@ PROJECT(TDengine)
AUX_SOURCE_DIRECTORY(. SRC) AUX_SOURCE_DIRECTORY(. SRC)
ADD_LIBRARY(oslinux ${SRC}) ADD_LIBRARY(oslinux ${SRC})
TARGET_LINK_LIBRARIES(oslinux m rt z dl) TARGET_LINK_LIBRARIES(oslinux m rt z dl)
\ No newline at end of file
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(oslinux jemalloc)
ENDIF ()
Subproject commit 826f3d3b7820a5c007d301854d56db003b424d0a Subproject commit 11d1e02255edfeeaa8d5b1f45abfa9637332ce65
...@@ -238,7 +238,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -238,7 +238,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE // (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
...@@ -253,7 +253,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -253,7 +253,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*interBytes = 0; *interBytes = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (functionId == TSDB_FUNC_COUNT) { if (functionId == TSDB_FUNC_COUNT) {
*type = TSDB_DATA_TYPE_BIGINT; *type = TSDB_DATA_TYPE_BIGINT;
*bytes = sizeof(int64_t); *bytes = sizeof(int64_t);
...@@ -261,7 +261,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -261,7 +261,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (functionId == TSDB_FUNC_TS_COMP) { if (functionId == TSDB_FUNC_TS_COMP) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = 1; // this results is compressed ts data, only one byte *bytes = 1; // this results is compressed ts data, only one byte
...@@ -316,20 +316,20 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -316,20 +316,20 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = (dataBytes + DATA_SET_FLAG_SIZE); *bytes = (dataBytes + DATA_SET_FLAG_SIZE);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SUM) { } else if (functionId == TSDB_FUNC_SUM) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SSumInfo); *bytes = sizeof(SSumInfo);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_AVG) { } else if (functionId == TSDB_FUNC_AVG) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SAvgInfo); *bytes = sizeof(SAvgInfo);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) { } else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) {
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(SRateInfo); *bytes = sizeof(SRateInfo);
...@@ -339,7 +339,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -339,7 +339,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = (sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param); *bytes = (sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SAMPLE) { } else if (functionId == TSDB_FUNC_SAMPLE) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
...@@ -351,7 +351,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -351,7 +351,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SSpreadInfo); *bytes = sizeof(SSpreadInfo);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_APERCT) { } else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
...@@ -359,13 +359,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -359,13 +359,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
int32_t bytesDigest = (int32_t) (sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION)); int32_t bytesDigest = (int32_t) (sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
*bytes = MAX(bytesHist, bytesDigest); *bytes = MAX(bytesHist, bytesDigest);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_LAST_ROW) { } else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = (sizeof(SLastrowInfo) + dataBytes); *bytes = (sizeof(SLastrowInfo) + dataBytes);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) { } else if (functionId == TSDB_FUNC_TWA) {
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
...@@ -388,7 +388,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -388,7 +388,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else { } else {
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
} }
*bytes = sizeof(int64_t); *bytes = sizeof(int64_t);
*interBytes = sizeof(SSumInfo); *interBytes = sizeof(SSumInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -458,9 +458,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -458,9 +458,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = (int16_t)dataType; *type = (int16_t)dataType;
*bytes = dataBytes; *bytes = dataBytes;
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param; size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
// the output column may be larger than sizeof(STopBotInfo) // the output column may be larger than sizeof(STopBotInfo)
*interBytes = (int32_t)size; *interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_SAMPLE) { } else if (functionId == TSDB_FUNC_SAMPLE) {
...@@ -484,7 +484,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -484,7 +484,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else { } else {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -501,7 +501,7 @@ int32_t isValidFunction(const char* name, int32_t len) { ...@@ -501,7 +501,7 @@ int32_t isValidFunction(const char* name, int32_t len) {
return aScalarFunctions[i].functionId; return aScalarFunctions[i].functionId;
} }
} }
for(int32_t i = 0; i <= TSDB_FUNC_ELAPSED; ++i) { for(int32_t i = 0; i <= TSDB_FUNC_ELAPSED; ++i) {
int32_t nameLen = (int32_t) strlen(aAggs[i].name); int32_t nameLen = (int32_t) strlen(aAggs[i].name);
if (len != nameLen) { if (len != nameLen) {
...@@ -519,7 +519,7 @@ static bool function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo ...@@ -519,7 +519,7 @@ static bool function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo
if (pResultInfo->initialized) { if (pResultInfo->initialized) {
return false; return false;
} }
memset(pCtx->pOutput, 0, (size_t)pCtx->outputBytes); memset(pCtx->pOutput, 0, (size_t)pCtx->outputBytes);
initResultInfo(pResultInfo, pCtx->interBufBytes); initResultInfo(pResultInfo, pCtx->interBufBytes);
return true; return true;
...@@ -537,7 +537,7 @@ static void function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -537,7 +537,7 @@ static void function_finalizer(SQLFunctionCtx *pCtx) {
if (pResInfo->hasResult != DATA_SET_FLAG) { if (pResInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
} }
doFinalizer(pCtx); doFinalizer(pCtx);
} }
...@@ -547,7 +547,7 @@ static void function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -547,7 +547,7 @@ static void function_finalizer(SQLFunctionCtx *pCtx) {
*/ */
static void count_function(SQLFunctionCtx *pCtx) { static void count_function(SQLFunctionCtx *pCtx) {
int32_t numOfElem = 0; int32_t numOfElem = 0;
/* /*
* 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->preAggVals.isSet == true; * 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->preAggVals.isSet == true;
* 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->preAggVals.isSet == true; * 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->preAggVals.isSet == true;
...@@ -562,7 +562,7 @@ static void count_function(SQLFunctionCtx *pCtx) { ...@@ -562,7 +562,7 @@ static void count_function(SQLFunctionCtx *pCtx) {
if (isNull(val, pCtx->inputType)) { if (isNull(val, pCtx->inputType)) {
continue; continue;
} }
numOfElem += 1; numOfElem += 1;
} }
} else { } else {
...@@ -570,11 +570,11 @@ static void count_function(SQLFunctionCtx *pCtx) { ...@@ -570,11 +570,11 @@ static void count_function(SQLFunctionCtx *pCtx) {
numOfElem = pCtx->size; numOfElem = pCtx->size;
} }
} }
if (numOfElem > 0) { if (numOfElem > 0) {
GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
} }
*((int64_t *)pCtx->pOutput) += numOfElem; *((int64_t *)pCtx->pOutput) += numOfElem;
SET_VAL(pCtx, numOfElem, 1); SET_VAL(pCtx, numOfElem, 1);
} }
...@@ -584,7 +584,7 @@ static void count_func_merge(SQLFunctionCtx *pCtx) { ...@@ -584,7 +584,7 @@ static void count_func_merge(SQLFunctionCtx *pCtx) {
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
*((int64_t *)pCtx->pOutput) += pData[i]; *((int64_t *)pCtx->pOutput) += pData[i];
} }
SET_VAL(pCtx, pCtx->size, 1); SET_VAL(pCtx, pCtx->size, 1);
} }
...@@ -679,12 +679,12 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { ...@@ -679,12 +679,12 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
static void do_sum(SQLFunctionCtx *pCtx) { static void do_sum(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
// Only the pre-computing information loaded and actual data does not loaded // Only the pre-computing information loaded and actual data does not loaded
if (pCtx->preAggVals.isSet) { if (pCtx->preAggVals.isSet) {
notNullElems = pCtx->size - pCtx->preAggVals.statis.numOfNull; notNullElems = pCtx->size - pCtx->preAggVals.statis.numOfNull;
assert(pCtx->size >= pCtx->preAggVals.statis.numOfNull); assert(pCtx->size >= pCtx->preAggVals.statis.numOfNull);
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
int64_t *retVal = (int64_t *)pCtx->pOutput; int64_t *retVal = (int64_t *)pCtx->pOutput;
*retVal += pCtx->preAggVals.statis.sum; *retVal += pCtx->preAggVals.statis.sum;
...@@ -731,10 +731,10 @@ static void do_sum(SQLFunctionCtx *pCtx) { ...@@ -731,10 +731,10 @@ static void do_sum(SQLFunctionCtx *pCtx) {
LIST_ADD_N_DOUBLE_FLOAT(*retVal, pCtx, pData, float, notNullElems, pCtx->inputType); LIST_ADD_N_DOUBLE_FLOAT(*retVal, pCtx, pData, float, notNullElems, pCtx->inputType);
} }
} }
// data in the check operation are all null, not output // data in the check operation are all null, not output
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) { if (notNullElems > 0) {
GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
} }
...@@ -742,7 +742,7 @@ static void do_sum(SQLFunctionCtx *pCtx) { ...@@ -742,7 +742,7 @@ static void do_sum(SQLFunctionCtx *pCtx) {
static void sum_function(SQLFunctionCtx *pCtx) { static void sum_function(SQLFunctionCtx *pCtx) {
do_sum(pCtx); do_sum(pCtx);
// keep the result data in output buffer, not in the intermediate buffer // keep the result data in output buffer, not in the intermediate buffer
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) {
...@@ -778,7 +778,7 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) { ...@@ -778,7 +778,7 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (notNullElems > 0) { if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
} }
...@@ -797,7 +797,7 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t c ...@@ -797,7 +797,7 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t c
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
// no result for first query, data block is required // no result for first query, data block is required
if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) { if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) {
return BLK_DATA_ALL_NEEDED; return BLK_DATA_ALL_NEEDED;
...@@ -810,7 +810,7 @@ static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t co ...@@ -810,7 +810,7 @@ static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t co
if (pCtx->order != pCtx->param[0].i64) { if (pCtx->order != pCtx->param[0].i64) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) { if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) {
return BLK_DATA_ALL_NEEDED; return BLK_DATA_ALL_NEEDED;
} else { } else {
...@@ -866,17 +866,17 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_ ...@@ -866,17 +866,17 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_
*/ */
static void avg_function(SQLFunctionCtx *pCtx) { static void avg_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
// NOTE: keep the intermediate result into the interResultBuf // NOTE: keep the intermediate result into the interResultBuf
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo);
double *pVal = &pAvgInfo->sum; double *pVal = &pAvgInfo->sum;
if (pCtx->preAggVals.isSet) { // Pre-aggregation if (pCtx->preAggVals.isSet) { // Pre-aggregation
notNullElems = pCtx->size - pCtx->preAggVals.statis.numOfNull; notNullElems = pCtx->size - pCtx->preAggVals.statis.numOfNull;
assert(notNullElems >= 0); assert(notNullElems >= 0);
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
*pVal += pCtx->preAggVals.statis.sum; *pVal += pCtx->preAggVals.statis.sum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
...@@ -886,7 +886,7 @@ static void avg_function(SQLFunctionCtx *pCtx) { ...@@ -886,7 +886,7 @@ static void avg_function(SQLFunctionCtx *pCtx) {
} }
} else { } else {
void *pData = GET_INPUT_DATA_LIST(pCtx); void *pData = GET_INPUT_DATA_LIST(pCtx);
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
LIST_ADD_N(*pVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType); LIST_ADD_N(*pVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
...@@ -909,18 +909,18 @@ static void avg_function(SQLFunctionCtx *pCtx) { ...@@ -909,18 +909,18 @@ static void avg_function(SQLFunctionCtx *pCtx) {
LIST_ADD_N(*pVal, pCtx, pData, uint64_t, notNullElems, pCtx->inputType); LIST_ADD_N(*pVal, pCtx, pData, uint64_t, notNullElems, pCtx->inputType);
} }
} }
if (!pCtx->hasNull) { if (!pCtx->hasNull) {
assert(notNullElems == pCtx->size); assert(notNullElems == pCtx->size);
} }
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
pAvgInfo->num += notNullElems; pAvgInfo->num += notNullElems;
if (notNullElems > 0) { if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
} }
// keep the data into the final output buffer for super table query since this execution may be the last one // keep the data into the final output buffer for super table query since this execution may be the last one
if (pCtx->stableQuery) { if (pCtx->stableQuery) {
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo)); memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo));
...@@ -929,18 +929,18 @@ static void avg_function(SQLFunctionCtx *pCtx) { ...@@ -929,18 +929,18 @@ static void avg_function(SQLFunctionCtx *pCtx) {
static void avg_func_merge(SQLFunctionCtx *pCtx) { static void avg_func_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
double *sum = (double*) pCtx->pOutput; double *sum = (double*) pCtx->pOutput;
char *input = GET_INPUT_DATA_LIST(pCtx); char *input = GET_INPUT_DATA_LIST(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) {
SAvgInfo *pInput = (SAvgInfo *)input; SAvgInfo *pInput = (SAvgInfo *)input;
if (pInput->num == 0) { // current input is null if (pInput->num == 0) { // current input is null
continue; continue;
} }
SET_DOUBLE_VAL(sum, *sum + pInput->sum); SET_DOUBLE_VAL(sum, *sum + pInput->sum);
// keep the number of data into the temp buffer // keep the number of data into the temp buffer
*(int64_t *)GET_ROWCELL_INTERBUF(pResInfo) += pInput->num; *(int64_t *)GET_ROWCELL_INTERBUF(pResInfo) += pInput->num;
} }
...@@ -951,10 +951,10 @@ static void avg_func_merge(SQLFunctionCtx *pCtx) { ...@@ -951,10 +951,10 @@ static void avg_func_merge(SQLFunctionCtx *pCtx) {
*/ */
static void avg_finalizer(SQLFunctionCtx *pCtx) { static void avg_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == MERGE_STAGE) { if (pCtx->currentStage == MERGE_STAGE) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) { if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return; return;
...@@ -964,15 +964,15 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) { ...@@ -964,15 +964,15 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
} else { // this is the secondary merge, only in the secondary merge, the input type is TSDB_DATA_TYPE_BINARY } else { // this is the secondary merge, only in the secondary merge, the input type is TSDB_DATA_TYPE_BINARY
assert(IS_NUMERIC_TYPE(pCtx->inputType)); assert(IS_NUMERIC_TYPE(pCtx->inputType));
SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo);
if (pAvgInfo->num == 0) { // all data are NULL or empty table if (pAvgInfo->num == 0) { // all data are NULL or empty table
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return; return;
} }
SET_DOUBLE_VAL((double *)pCtx->pOutput, pAvgInfo->sum / pAvgInfo->num); SET_DOUBLE_VAL((double *)pCtx->pOutput, pAvgInfo->sum / pAvgInfo->num);
} }
// cannot set the numOfIteratedElems again since it is set during previous iteration // cannot set the numOfIteratedElems again since it is set during previous iteration
GET_RES_INFO(pCtx)->numOfRes = 1; GET_RES_INFO(pCtx)->numOfRes = 1;
doFinalizer(pCtx); doFinalizer(pCtx);
...@@ -992,7 +992,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -992,7 +992,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
void* tval = NULL; void* tval = NULL;
int16_t index = 0; int16_t index = 0;
if (isMin) { if (isMin) {
tval = &pCtx->preAggVals.statis.min; tval = &pCtx->preAggVals.statis.min;
index = pCtx->preAggVals.statis.minIndex; index = pCtx->preAggVals.statis.minIndex;
...@@ -1000,7 +1000,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -1000,7 +1000,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
tval = &pCtx->preAggVals.statis.max; tval = &pCtx->preAggVals.statis.max;
index = pCtx->preAggVals.statis.maxIndex; index = pCtx->preAggVals.statis.maxIndex;
} }
TSKEY key = TSKEY_INITIAL_VAL; TSKEY key = TSKEY_INITIAL_VAL;
if (pCtx->ptsList != NULL) { if (pCtx->ptsList != NULL) {
/** /**
...@@ -1016,23 +1016,23 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -1016,23 +1016,23 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
// the index is the original position, not the relative position // the index is the original position, not the relative position
key = pCtx->ptsList[index]; key = pCtx->ptsList[index];
} }
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
int64_t val = GET_INT64_VAL(tval); int64_t val = GET_INT64_VAL(tval);
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
int8_t *data = (int8_t *)pOutput; int8_t *data = (int8_t *)pOutput;
UPDATE_DATA(pCtx, *data, (int8_t)val, notNullElems, isMin, key); UPDATE_DATA(pCtx, *data, (int8_t)val, notNullElems, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
int16_t *data = (int16_t *)pOutput; int16_t *data = (int16_t *)pOutput;
UPDATE_DATA(pCtx, *data, (int16_t)val, notNullElems, isMin, key); UPDATE_DATA(pCtx, *data, (int16_t)val, notNullElems, isMin, key);
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
int32_t *data = (int32_t *)pOutput; int32_t *data = (int32_t *)pOutput;
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
qDebug("max value updated according to pre-cal:%d", *data); qDebug("max value updated according to pre-cal:%d", *data);
#endif #endif
if ((*data < val) ^ isMin) { if ((*data < val) ^ isMin) {
*data = (int32_t)val; *data = (int32_t)val;
for (int32_t i = 0; i < (pCtx)->tagInfo.numOfTagCols; ++i) { for (int32_t i = 0; i < (pCtx)->tagInfo.numOfTagCols; ++i) {
...@@ -1041,7 +1041,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -1041,7 +1041,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
__ctx->tag.i64 = key; __ctx->tag.i64 = key;
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
} }
aAggs[TSDB_FUNC_TAG].xFunction(__ctx); aAggs[TSDB_FUNC_TAG].xFunction(__ctx);
} }
} }
...@@ -1073,18 +1073,18 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -1073,18 +1073,18 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
float *data = (float *)pOutput; float *data = (float *)pOutput;
double val = GET_DOUBLE_VAL(tval); double val = GET_DOUBLE_VAL(tval);
UPDATE_DATA(pCtx, *data, (float)val, notNullElems, isMin, key); UPDATE_DATA(pCtx, *data, (float)val, notNullElems, isMin, key);
} }
return; return;
} }
void *p = GET_INPUT_DATA_LIST(pCtx); void *p = GET_INPUT_DATA_LIST(pCtx);
TSKEY *tsList = GET_TS_LIST(pCtx); TSKEY *tsList = GET_TS_LIST(pCtx);
*notNullElems = 0; *notNullElems = 0;
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
TYPED_LOOPCHECK_N(int8_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems); TYPED_LOOPCHECK_N(int8_t, pOutput, p, pCtx, pCtx->inputType, isMin, *notNullElems);
...@@ -1093,12 +1093,12 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -1093,12 +1093,12 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
int32_t *pData = p; int32_t *pData = p;
int32_t *retVal = (int32_t*) pOutput; int32_t *retVal = (int32_t*) pOutput;
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
if (pCtx->hasNull && isNull((const char*)&pData[i], pCtx->inputType)) { if (pCtx->hasNull && isNull((const char*)&pData[i], pCtx->inputType)) {
continue; continue;
} }
if ((*retVal < pData[i]) ^ isMin) { if ((*retVal < pData[i]) ^ isMin) {
*retVal = pData[i]; *retVal = pData[i];
if(tsList) { if(tsList) {
...@@ -1135,9 +1135,9 @@ static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo ...@@ -1135,9 +1135,9 @@ static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo
if (!function_setup(pCtx, pResultInfo)) { if (!function_setup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized return false; // not initialized since it has been initialized
} }
GET_TRUE_DATA_TYPE(); GET_TRUE_DATA_TYPE();
switch (type) { switch (type) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)pCtx->pOutput) = INT8_MAX; *((int8_t *)pCtx->pOutput) = INT8_MAX;
...@@ -1180,9 +1180,9 @@ static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo ...@@ -1180,9 +1180,9 @@ static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo
if (!function_setup(pCtx, pResultInfo)) { if (!function_setup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized return false; // not initialized since it has been initialized
} }
GET_TRUE_DATA_TYPE(); GET_TRUE_DATA_TYPE();
switch (type) { switch (type) {
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
*((int32_t *)pCtx->pOutput) = INT32_MIN; *((int32_t *)pCtx->pOutput) = INT32_MIN;
...@@ -1217,7 +1217,7 @@ static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo ...@@ -1217,7 +1217,7 @@ static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo
default: default:
qError("illegal data type:%d in min/max query", pCtx->inputType); qError("illegal data type:%d in min/max query", pCtx->inputType);
} }
return true; return true;
} }
...@@ -1227,13 +1227,13 @@ static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo ...@@ -1227,13 +1227,13 @@ static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo
static void min_function(SQLFunctionCtx *pCtx) { static void min_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
minMax_function(pCtx, pCtx->pOutput, 1, &notNullElems); minMax_function(pCtx, pCtx->pOutput, 1, &notNullElems);
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) { if (notNullElems > 0) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
// set the flag for super table query // set the flag for super table query
if (pCtx->stableQuery) { if (pCtx->stableQuery) {
*(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG; *(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG;
...@@ -1244,13 +1244,13 @@ static void min_function(SQLFunctionCtx *pCtx) { ...@@ -1244,13 +1244,13 @@ static void min_function(SQLFunctionCtx *pCtx) {
static void max_function(SQLFunctionCtx *pCtx) { static void max_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
minMax_function(pCtx, pCtx->pOutput, 0, &notNullElems); minMax_function(pCtx, pCtx->pOutput, 0, &notNullElems);
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) { if (notNullElems > 0) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
// set the flag for super table query // set the flag for super table query
if (pCtx->stableQuery) { if (pCtx->stableQuery) {
*(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG; *(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG;
...@@ -1260,16 +1260,16 @@ static void max_function(SQLFunctionCtx *pCtx) { ...@@ -1260,16 +1260,16 @@ static void max_function(SQLFunctionCtx *pCtx) {
static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *output, bool isMin) { static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *output, bool isMin) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
GET_TRUE_DATA_TYPE(); GET_TRUE_DATA_TYPE();
assert(pCtx->stableQuery); assert(pCtx->stableQuery);
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
char *input = GET_INPUT_DATA(pCtx, i); char *input = GET_INPUT_DATA(pCtx, i);
if (input[bytes] != DATA_SET_FLAG) { if (input[bytes] != DATA_SET_FLAG) {
continue; continue;
} }
switch (type) { switch (type) {
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
int8_t v = GET_INT8_VAL(input); int8_t v = GET_INT8_VAL(input);
...@@ -1285,12 +1285,12 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp ...@@ -1285,12 +1285,12 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp
int32_t v = GET_INT32_VAL(input); int32_t v = GET_INT32_VAL(input);
if ((*(int32_t *)output < v) ^ isMin) { if ((*(int32_t *)output < v) ^ isMin) {
*(int32_t *)output = v; *(int32_t *)output = v;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[j]; SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[j];
aAggs[TSDB_FUNC_TAG].xFunction(__ctx); aAggs[TSDB_FUNC_TAG].xFunction(__ctx);
} }
notNullElems++; notNullElems++;
} }
break; break;
...@@ -1339,15 +1339,15 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp ...@@ -1339,15 +1339,15 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp
break; break;
} }
} }
return notNullElems; return notNullElems;
} }
static void min_func_merge(SQLFunctionCtx *pCtx) { static void min_func_merge(SQLFunctionCtx *pCtx) {
int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 1); int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 1);
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (notNullElems > 0) { if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
...@@ -1356,9 +1356,9 @@ static void min_func_merge(SQLFunctionCtx *pCtx) { ...@@ -1356,9 +1356,9 @@ static void min_func_merge(SQLFunctionCtx *pCtx) {
static void max_func_merge(SQLFunctionCtx *pCtx) { static void max_func_merge(SQLFunctionCtx *pCtx) {
int32_t numOfElem = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 0); int32_t numOfElem = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 0);
SET_VAL(pCtx, numOfElem, 1); SET_VAL(pCtx, numOfElem, 1);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (numOfElem > 0) { if (numOfElem > 0) {
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
...@@ -4870,7 +4870,8 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) { ...@@ -4870,7 +4870,8 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) {
} }
SElapsedInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SElapsedInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
*(double *)pCtx->pOutput = (double)pInfo->max - (double)pInfo->min; double result = (double)pInfo->max - (double)pInfo->min;
*(double *)pCtx->pOutput = result >= 0 ? result : -result;
if (pCtx->numOfParams > 0 && pCtx->param[0].i64 > 0) { if (pCtx->numOfParams > 0 && pCtx->param[0].i64 > 0) {
*(double *)pCtx->pOutput = *(double *)pCtx->pOutput / pCtx->param[0].i64; *(double *)pCtx->pOutput = *(double *)pCtx->pOutput / pCtx->param[0].i64;
} }
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
import subprocess
class TDTestCase:
def caseDescription(self):
'''
case1<sdsang>: [TD-12362] taosdump supports JSON
'''
return
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tmpdir = "tmp"
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosdump" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def run(self):
tdSql.prepare()
tdSql.execute("drop database if exists db")
tdSql.execute("create database db days 11 keep 3649 blocks 8 ")
tdSql.execute("use db")
tdSql.execute(
"create table st(ts timestamp, c1 int) tags(jtag JSON)")
tdSql.execute("create table t1 using st tags('{\"location\": \"beijing\"}')")
tdSql.execute("insert into t1 values(1500000000000, 1)")
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosdump not found!")
else:
tdLog.info("taosdump found in %s" % buildPath)
binPath = buildPath + "/build/bin/"
if not os.path.exists(self.tmpdir):
os.makedirs(self.tmpdir)
else:
print("directory exists")
os.system("rm -rf %s" % self.tmpdir)
os.makedirs(self.tmpdir)
os.system("%staosdump --databases db -o %s" % (binPath, self.tmpdir))
tdSql.execute("drop database db")
os.system("%staosdump -i %s" % (binPath, self.tmpdir))
tdSql.query("show databases")
tdSql.checkRows(1)
tdSql.execute("use db")
tdSql.query("show stables")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'st')
tdSql.query("show tables")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 't1')
tdSql.query("select jtag->'location' from st")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "\"beijing\"")
tdSql.query("select * from st where jtag contains 'location'")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, '{\"location\":\"beijing\"}')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
python3 ./test.py -f 2-query/ts_hidden_column.py python3 ./test.py -f 2-query/ts_hidden_column.py
python3 ./test.py -f 2-query/union-order.py python3 ./test.py -f 2-query/union-order.py
python3 ./test.py -f 2-query/session_two_stage.py python3 ./test.py -f 2-query/session_two_stage.py
\ No newline at end of file
python3 ./test.py -f 5-taos-tools/taosdump/taosdumpTestTypeJson.py
###################################################################
# Copyright (c) 2020 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from posixpath import split
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1420041600000 # 2015-01-01 00:00:00 this is begin time for first record
self.num = 10
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def caseDescription(self):
'''
case1 <shenglian zhou>: [TD-12344] : fix session window for super table two stage query
'''
return
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def getcfgPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
print(selfPath)
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
cfgPath = projPath + "/sim/dnode1/cfg "
return cfgPath
def run(self):
tdSql.prepare()
tdSql.execute("create database if not exists testdb keep 36500;")
tdSql.execute("use testdb;")
tdSql.execute("create stable st (ts timestamp , id int , value double) tags(hostname binary(10) ,ind int);")
for i in range(self.num):
tdSql.execute("insert into sub_%s using st tags('host_%s' , %d) values (%d , %d , %f );"%(str(i),str(i),i*10,self.ts+10000*i,i*2,i+10.00))
tdSql.query('select elapsed(ts,10s) from sub_1 where ts>="2015-01-01 00:00:00.000" and ts < "2015-01-01 00:10:00.000" session(ts,1d) ;')
cfg_path = self.getcfgPath()
print(cfg_path)
tdSql.query('select elapsed(ts,10s) from st where ts>="2015-01-01 00:00:00.000" and ts < "2015-01-01 00:10:00.000" session(ts,1d) group by tbname;') # session not support super table
tdSql.checkRows(10)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -59,7 +59,7 @@ class ElapsedCase: ...@@ -59,7 +59,7 @@ class ElapsedCase:
tdSql.query("select elapsed(ts), elapsed(ts, 10m), elapsed(ts, 100m) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") tdSql.query("select elapsed(ts), elapsed(ts, 10m), elapsed(ts, 100m) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname")
tdSql.checkEqual(int(tdSql.getData(0, 1)), 99) tdSql.checkEqual(int(tdSql.getData(0, 1)), 99)
tdSql.checkEqual(int(tdSql.getData(0, 2)), 9) tdSql.checkEqual(int(tdSql.getData(0, 2)), 9)
# stddev(f), # stddev(f),
tdSql.query("select elapsed(ts), count(*), avg(f), twa(f), irate(f), sum(f), min(f), max(f), first(f), last(f), apercentile(i, 30), last_row(i), spread(i) " tdSql.query("select elapsed(ts), count(*), avg(f), twa(f), irate(f), sum(f), min(f), max(f), first(f), last(f), apercentile(i, 30), last_row(i), spread(i) "
"from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") "from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname")
tdSql.checkRows(2) tdSql.checkRows(2)
...@@ -100,7 +100,7 @@ class ElapsedCase: ...@@ -100,7 +100,7 @@ class ElapsedCase:
tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' session(ts, 70s)") tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' session(ts, 70s)")
tdSql.checkRows(1) tdSql.checkRows(1)
# It has little to do with the elapsed function, so just simple test. # It has little to do with the elapsed function, so just simple test.
def stateWindowTest(self): def stateWindowTest(self):
tdSql.execute("use wxy_db") tdSql.execute("use wxy_db")
...@@ -110,7 +110,7 @@ class ElapsedCase: ...@@ -110,7 +110,7 @@ class ElapsedCase:
tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' state_window(b)") tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' state_window(b)")
tdSql.checkRows(2) tdSql.checkRows(2)
def intervalTest(self): def intervalTest(self):
tdSql.execute("use wxy_db") tdSql.execute("use wxy_db")
...@@ -186,7 +186,7 @@ class ElapsedCase: ...@@ -186,7 +186,7 @@ class ElapsedCase:
else: else:
subtable[result[i][tbnameCol]].append(result[i][elapsedCol]) subtable[result[i][tbnameCol]].append(result[i][elapsedCol])
return subtable return subtable
def doOrderbyCheck(self, resultAsc, resultdesc): def doOrderbyCheck(self, resultAsc, resultdesc):
resultRows = len(resultAsc) resultRows = len(resultAsc)
for i in range(resultRows): for i in range(resultRows):
...@@ -222,6 +222,13 @@ class ElapsedCase: ...@@ -222,6 +222,13 @@ class ElapsedCase:
self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(222m) group by tbname", 1, 2) self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(222m) group by tbname", 1, 2)
self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1000m) group by tbname", 1, 2) self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1000m) group by tbname", 1, 2)
#nested query
resAsc = tdSql.getResult("select elapsed(ts) from (select csum(i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00')")
resDesc = tdSql.getResult("select elapsed(ts) from (select csum(i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' order by ts desc)")
resRows = len(resAsc)
for i in range(resRows):
tdSql.checkEqual(resAsc[i][0], resDesc[resRows - i - 1][0])
def slimitCheck(self, sql): def slimitCheck(self, sql):
tdSql.checkEqual(tdSql.query(sql + " slimit 0"), 0) tdSql.checkEqual(tdSql.query(sql + " slimit 0"), 0)
tdSql.checkEqual(tdSql.query(sql + " slimit 1 soffset 0"), tdSql.query(sql + " slimit 0, 1")) tdSql.checkEqual(tdSql.query(sql + " slimit 1 soffset 0"), tdSql.query(sql + " slimit 0, 1"))
...@@ -307,7 +314,7 @@ class ElapsedCase: ...@@ -307,7 +314,7 @@ class ElapsedCase:
"select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-22 02:00:00' group by tbname") "select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-22 02:00:00' group by tbname")
self.unionAllCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1m) group by tbname", self.unionAllCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1m) group by tbname",
"select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(222m) group by tbname") "select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(222m) group by tbname")
# It has little to do with the elapsed function, so just simple test. # It has little to do with the elapsed function, so just simple test.
def continuousQueryTest(self): def continuousQueryTest(self):
tdSql.execute("use wxy_db") tdSql.execute("use wxy_db")
......
python3 ./test.py -f 1-insert/batchInsert.py #python3 ./test.py -f 1-insert/batchInsert.py
python3 ./test.py -f 0-others/json_tag.py python3 ./test.py -f 0-others/create_col_tag.py
python3 ./test.py -f 2-query/ts_hidden_column.py python3 ./test.py -f 2-query/TD-11389.py
python3 ./test.py -f 2-query/union-order.py python3 ./test.py -f 2-query/TD-11945_crash.py
\ No newline at end of file python3 ./test.py -f 2-query/TD-12340-12342.py
python3 ./test.py -f 2-query/TD-12344.py
\ No newline at end of file
python3 ./test.py -f 4-taosAdapter/taosAdapter_insert.py
python3 ./test.py -f 4-taosAdapter/taosAdapter_query.py
python3 ./test.py -f 5-taos-tools/basic.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册