提交 1f787905 编写于 作者: S shenglian zhou

Merge remote-tracking branch 'origin/develop' into szhou/feature/support-math-functions

......@@ -254,23 +254,25 @@ Query OK, 2 row(s) in set (0.001700s)
TDengine 提供了丰富的应用程序开发接口,其中包括C/C++、Java、Python、Go、Node.js、C# 、RESTful 等,便于用户快速开发应用:
- Java
- [Java](https://www.taosdata.com/cn/documentation/connector/java)
- C/C++
- [C/C++](https://www.taosdata.com/cn/documentation/connector#c-cpp)
- Python
- [Python](https://www.taosdata.com/cn/documentation/connector#python)
- Go
- [Go](https://www.taosdata.com/cn/documentation/connector#go)
- RESTful API
- [RESTful API](https://www.taosdata.com/cn/documentation/connector#restful)
- Node.js
- [Node.js](https://www.taosdata.com/cn/documentation/connector#nodejs)
- [Rust](https://www.taosdata.com/cn/documentation/connector/rust)
## 第三方连接器
TDengine 社区生态中也有一些非常友好的第三方连接器,可以通过以下链接访问它们的源码。
- [Rust Connector](https://github.com/taosdata/TDengine/tree/master/tests/examples/rust)
- [Rust Bindings](https://github.com/songtianyi/tdengine-rust-bindings/tree/master/examples)
- [.Net Core Connector](https://github.com/maikebing/Maikebing.EntityFrameworkCore.Taos)
- [Lua Connector](https://github.com/taosdata/TDengine/tree/develop/tests/examples/lua)
......
......@@ -281,18 +281,19 @@ drop database db;
TDengine provides abundant developing tools for users to develop on TDengine. Follow the links below to find your desired connectors and relevant documentation.
- [Java](https://www.taosdata.com/en/documentation/connector/#Java-Connector)
- [C/C++](https://www.taosdata.com/en/documentation/connector/#C/C++-Connector)
- [Python](https://www.taosdata.com/en/documentation/connector/#Python-Connector)
- [Go](https://www.taosdata.com/en/documentation/connector/#Go-Connector)
- [RESTful API](https://www.taosdata.com/en/documentation/connector/#RESTful-Connector)
- [Node.js](https://www.taosdata.com/en/documentation/connector/#Node.js-Connector)
- [Java](https://www.taosdata.com/en/documentation/connector/java)
- [C/C++](https://www.taosdata.com/en/documentation/connector#c-cpp)
- [Python](https://www.taosdata.com/en/documentation/connector#python)
- [Go](https://www.taosdata.com/en/documentation/connector#go)
- [RESTful API](https://www.taosdata.com/en/documentation/connector#restful)
- [Node.js](https://www.taosdata.com/en/documentation/connector#nodejs)
- [Rust](https://www.taosdata.com/en/documentation/connector/rust)
### Third Party Connectors
The TDengine community has also kindly built some of their own connectors! Follow the links below to find the source code for them.
- [Rust Connector](https://github.com/taosdata/TDengine/tree/master/tests/examples/rust)
- [Rust Bindings](https://github.com/songtianyi/tdengine-rust-bindings/tree/master/examples)
- [.Net Core Connector](https://github.com/maikebing/Maikebing.EntityFrameworkCore.Taos)
- [Lua Connector](https://github.com/taosdata/TDengine/tree/develop/tests/examples/lua)
......
......@@ -596,7 +596,7 @@ taosdemo支持3种功能的测试,包括插入、查询、订阅。但一个ta
"interlace_rows": 设置轮询插入每个单表数据的条目数,如果interlace_rows*childtable_count*supertable_num小于num_of_records_per_req时,则请求插入的数目以interlace_rows*childtable_count*supertable_num为准。可选项,缺省是0。
"num_of_records_per_req": 每条请求数据内容包含的插入数据记录数目,该数据组成的sql不能大于maxsqllen,如果过大,则取taood限制的1M长度(1048576)。可选项,缺省是INT64_MAX 32766(受服务端限制)。0代表不插入数据,建议配置大于0。
"num_of_records_per_req": 每条请求数据内容包含的插入数据记录数目,该数据组成的sql不能大于maxsqllen,如果过大,则取taosd限制的1M长度(1048576)。0代表不插入数据,建议配置大于0。
"databases": [{
......
......@@ -22,7 +22,7 @@ INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6,
**Tips:**
- 要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过16K,一条SQL语句总长度不能超过64K(可通过参数maxSQLLength配置,最大可配置为1M)
- 要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过16K,一条SQL语句总长度不能超过1M
- TDengine支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开20个以上的线程同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程频繁切换,带来额外开销。
- 对同一张表,如果新插入记录的时间戳已经存在,默认情形下(UPDATE=0)新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。如果应用自动生成记录,很有可能生成的时间戳是一样的,这样,成功插入的记录条数会小于应用插入的记录条数。如果在创建数据库时使用了 UPDATE 1 选项,插入相同时间戳的新记录将覆盖原有记录。
- 写入的数据的时间戳必须大于当前时间减去配置参数keep的时间。如果keep配置为3650天,那么无法写入比3650天还早的数据。写入数据的时间戳也不能大于当前时间加配置参数days。如果days为2,那么无法写入比当前时间还晚2天的数据。
......
......@@ -12,7 +12,7 @@
默认情况下,[libtaos-rs] 使用 C 接口连接数据库,所以您需要:
- [TDengine] [客户端](https://www.taosdata.com/cn/getting-started/#%E9%80%9A%E8%BF%87%E5%AE%89%E8%A3%85%E5%8C%85%E5%AE%89%E8%A3%85)
- [TDengine客户端](https://www.taosdata.com/cn/getting-started/#%E9%80%9A%E8%BF%87%E5%AE%89%E8%A3%85%E5%8C%85%E5%AE%89%E8%A3%85)
- `clang`: `bindgen` 使用 `libclangAST` 来生成对应的Rust绑定。
## 特性列表
......@@ -62,7 +62,7 @@ libtaos = { version = "*", features = ["r2d2"] }
libtaos = { version = "*", features = ["rest"] }
```
本项目中提供一个 [示例程序]([examples/demo.rs](https://github.com/taosdata/libtaos-rs/blob/main/examples/demo.rs)) 如下:
本项目中提供一个 [示例程序](https://github.com/taosdata/libtaos-rs/blob/main/examples/demo.rs) 如下:
```rust
// ...
......
......@@ -166,7 +166,7 @@ taosd -C
| 48 | mqttPort | YES | **S** | | mqtt client name | | | 1883 |
| 49 | mqttTopic | YES | **S** | | | | | /test |
| 50 | compressMsgSize | | **S** | bytes | 客户端与服务器之间进行消息通讯过程中,对通讯的消息进行压缩的阈值。如果要压缩消息,建议设置为64330字节,即大于64330字节的消息体才进行压缩。 | `0 `表示对所有的消息均进行压缩 >0: 超过该值的消息才进行压缩 -1: 不压缩 | -1 | |
| 51 | maxSQLLength | | **C** | bytes | 单条SQL语句允许的最长限制 | 65480-1048576 | 65380 | |
| 51 | maxSQLLength | | **C** | bytes | 单条SQL语句允许的最长限制 | 65480-1048576 | 1048576 | |
| 52 | maxNumOfOrderedRes | | **SC** | | 支持超级表时间排序允许的最多记录数限制 | | 10万 | |
| 53 | timezone | | **SC** | | 时区 | | 从系统中动态获取当前的时区设置 | |
| 54 | locale | | **SC** | | 系统区位信息及编码格式 | | 系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置 | |
......@@ -731,7 +731,7 @@ rmtaos
- 数据库名、表名、列名,都不能以数字开头,合法的可用字符集是“英文字符、数字和下划线”
- 表的列数:不能超过 1024 列,最少需要 2 列,第一列必须是时间戳(从 2.1.7.0 版本开始,改为最多支持 4096 列)
- 记录的最大长度:包括时间戳 8 byte,不能超过 16KB(每个 BINARY/NCHAR 类型的列还会额外占用 2 个 byte 的存储位置)
- 单条 SQL 语句默认最大字符串长度:65480 byte,但可通过系统配置参数 maxSQLLength 修改,最长可配置为 1048576 byte
- 单条 SQL 语句默认最大字符串长度:1048576 byte,但可通过系统配置参数 maxSQLLength 修改,取值范围 65480 ~ 1048576 byte
- 数据库副本数:不能超过 3
- 用户名:不能超过 23 个 byte
- 用户密码:不能超过 15 个 byte
......
......@@ -1589,7 +1589,7 @@ SELECT AVG(current), MAX(current), APERCENTILE(current, 50) FROM meters
- 表名最大长度为 192,每行数据最大长度 16k 个字符, 从 2.1.7.0 版本开始,每行数据最大长度 48k 个字符(注意:数据行内每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)。
- 列名最大长度为 64,最多允许 1024 列,最少需要 2 列,第一列必须是时间戳。(从 2.1.7.0 版本开始,改为最多允许 4096 列)
- 标签名最大长度为 64,最多允许 128 个,可以 1 个,一个表中标签值的总长度不超过 16k 个字符。
- SQL 语句最大长度 65480 个字符,但可通过系统配置参数 maxSQLLength 修改,最长可配置为 1M
- SQL 语句最大长度 1048576 个字符,也可通过系统配置参数 maxSQLLength 修改,取值范围 65480 ~ 1048576
- SELECT 语句的查询结果,最多允许返回 1024 列(语句中的函数调用可能也会占用一些列空间),超限时需要显式指定较少的返回数据列,以避免语句执行报错。(从 2.1.7.0 版本开始,改为最多允许 4096 列)
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制。
......
......@@ -12,7 +12,7 @@ Thanks [@songtianyi](https://github.com/songtianyi) for [libtdengine](https://gi
if you use the default features, it'll depend on:
- [TDengine] Client library and headers.
- [TDengine Client](https://www.taosdata.com/cn/getting-started/#%E9%80%9A%E8%BF%87%E5%AE%89%E8%A3%85%E5%8C%85%E5%AE%89%E8%A3%85) library and headers.
- clang because bindgen will requires the clang AST library.
## Fetures
......@@ -66,7 +66,7 @@ For REST client:
libtaos = { version = "*", features = ["rest"] }
```
There's a [demo app]([examples/demo.rs](https://github.com/taosdata/libtaos-rs/blob/main/examples/demo.rs)) in examples directory, looks like this:
There's a [demo app](https://github.com/taosdata/libtaos-rs/blob/main/examples/demo.rs) in examples directory, looks like this:
```rust
// ...
......
Subproject commit b8f76da4a708d158ec3cc4b844571dc4414e36b4
Subproject commit 25f8683ece07897fea12c347d369602b2235665f
......@@ -10,9 +10,8 @@ const ArrayType = require('ref-array-napi');
const Struct = require('ref-struct-napi');
const FieldTypes = require('./constants');
const errors = require('./error');
const _ = require('lodash')
const TaosObjects = require('./taosobjects');
const { NULL_POINTER } = require('ref-napi');
const { Console } = require('console');
module.exports = CTaosInterface;
......@@ -223,6 +222,8 @@ TaosField.fields.name.type.size = 65;
TaosField.defineProperty('type', ref.types.char);
TaosField.defineProperty('bytes', ref.types.short);
//define schemaless line array
var smlLine = ArrayType(ref.coerceType('char *'))
/**
*
......@@ -238,7 +239,6 @@ function CTaosInterface(config = null, pass = false) {
ref.types.void_ptr2 = ref.refType(ref.types.void_ptr);
/*Declare a bunch of functions first*/
/* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS * */
if ('win32' == os.platform()) {
taoslibname = 'taos';
} else {
......@@ -303,9 +303,15 @@ function CTaosInterface(config = null, pass = false) {
// int64_t stime, void *param, void (*callback)(void *));
'taos_open_stream': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr]],
//void taos_close_stream(TAOS_STREAM *tstr);
'taos_close_stream': [ref.types.void, [ref.types.void_ptr]]
'taos_close_stream': [ref.types.void, [ref.types.void_ptr]],
//Schemaless insert
//TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol,int precision)
// 'taos_schemaless_insert': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.int, ref.types.int, ref.types.int]]
'taos_schemaless_insert': [ref.types.void_ptr, [ref.types.void_ptr, smlLine, 'int', 'int', 'int']]
});
if (pass == false) {
if (config == null) {
this._config = ref.alloc(ref.types.char_ptr, ref.NULL);
......@@ -664,3 +670,38 @@ CTaosInterface.prototype.closeStream = function closeStream(stream) {
this.libtaos.taos_close_stream(stream);
console.log("Closed stream");
}
//Schemaless insert API
/**
* TAOS* taos, char* lines[], int numLines, int protocol,int precision)
* using taos_errstr get error info, taos_errno get error code. Remmember
* to release taos_res, otherwile will lead memory leak.
* TAOS schemaless insert api
* @param {*} connection a valid database connection
* @param {*} lines string data, which statisfied with line proctocol
* @param {*} numLines number of rows in param lines.
* @param {*} protocal Line protocol, enum type (0,1,2,3),indicate different line protocol
* @param {*} precision timestamp precision in lines, enum type (0,1,2,3,4,5,6)
* @returns TAOS_RES
*
*/
CTaosInterface.prototype.schemalessInsert = function schemalessInsert(connection,lines, protocal, precision) {
let _numLines = null;
let _lines = null;
if(_.isString(lines)){
_numLines = 1;
_lines = Buffer.alloc(_numLines * ref.sizeof.pointer);
ref.set(_lines,0,ref.allocCString(lines),ref.types.char_ptr);
}
else if(_.isArray(lines)){
_numLines = lines.length;
_lines = Buffer.alloc(_numLines * ref.sizeof.pointer);
for(let i = 0; i < _numLines ; i++){
ref.set(_lines,i*ref.sizeof.pointer,ref.allocCString(lines[i]),ref.types.char_ptr)
}
}
else{
throw new errors.InterfaceError("Unsupport lines input")
}
return this.libtaos.taos_schemaless_insert(connection, _lines, _numLines, protocal, precision);
}
const SCHEMALESS_PROTOCOL = {
TSDB_SML_UNKNOWN_PROTOCOL: 0,
TSDB_SML_LINE_PROTOCOL: 1,
TSDB_SML_TELNET_PROTOCOL: 2,
TSDB_SML_JSON_PROTOCOL: 3
}
const SCHEMALESS_PRECISION = {
TSDB_SML_TIMESTAMP_NOT_CONFIGURED : 0,
TSDB_SML_TIMESTAMP_HOURS : 1,
TSDB_SML_TIMESTAMP_MINUTES : 2,
TSDB_SML_TIMESTAMP_SECONDS : 3,
TSDB_SML_TIMESTAMP_MILLI_SECONDS : 4,
TSDB_SML_TIMESTAMP_MICRO_SECONDS : 5,
TSDB_SML_TIMESTAMP_NANO_SECONDS : 6
}
const typeCodesToName = {
0: 'Null',
1: 'Boolean',
2: 'Tiny Int',
3: 'Small Int',
4: 'Int',
5: 'Big Int',
6: 'Float',
7: 'Double',
8: 'Binary',
9: 'Timestamp',
10: 'Nchar',
11: 'Tinyint Unsigned',
12: 'Smallint Unsigned',
13: 'Int Unsigned',
14: 'Bigint Unsigned',
}
/**
* @function
* @param {number} typecode - The code to get the name of the type for
* @return {string} Name of the field type
*/
function getType(typecode) {
return typeCodesToName[typecode];
}
/**
* Contains the the definitions/values assigned to various field types
* @module FieldTypes
......@@ -18,71 +60,45 @@
* @property {number} C_TIMESTAMP - Timestamp in format "YYYY:MM:DD HH:MM:SS.MMM". Measured in number of milliseconds passed after
1970-01-01 08:00:00.000 GMT.
* @property {number} C_NCHAR - NChar field type encoded in ASCII, a wide string.
*
*
*
*
* @property {number} C_TIMESTAMP_MILLI - The code for millisecond timestamps, as returned by libtaos.taos_result_precision(result).
* @property {number} C_TIMESTAMP_MICRO - The code for microsecond timestamps, as returned by libtaos.taos_result_precision(result).
*/
module.exports = {
C_NULL : 0,
C_BOOL : 1,
C_TINYINT : 2,
C_SMALLINT : 3,
C_INT : 4,
C_BIGINT : 5,
C_FLOAT : 6,
C_DOUBLE : 7,
C_BINARY : 8,
C_TIMESTAMP : 9,
C_NCHAR : 10,
C_TINYINT_UNSIGNED : 11,
C_SMALLINT_UNSIGNED : 12,
C_INT_UNSIGNED : 13,
C_BIGINT_UNSIGNED : 14,
// NULL value definition
// NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL : 2,
C_TINYINT_NULL : -128,
C_TINYINT_UNSIGNED_NULL : 255,
C_SMALLINT_NULL : -32768,
C_SMALLINT_UNSIGNED_NULL : 65535,
C_INT_NULL : -2147483648,
C_INT_UNSIGNED_NULL : 4294967295,
C_BIGINT_NULL : -9223372036854775808n,
C_BIGINT_UNSIGNED_NULL : 18446744073709551615n,
C_FLOAT_NULL : 2146435072,
C_DOUBLE_NULL : -9223370937343148032,
C_NCHAR_NULL : 4294967295,
C_BINARY_NULL : 255,
C_TIMESTAMP_MILLI : 0,
C_TIMESTAMP_MICRO : 1,
getType,
}
const typeCodesToName = {
0 : 'Null',
1 : 'Boolean',
2 : 'Tiny Int',
3 : 'Small Int',
4 : 'Int',
5 : 'Big Int',
6 : 'Float',
7 : 'Double',
8 : 'Binary',
9 : 'Timestamp',
10 : 'Nchar',
11 : 'TINYINT_UNSIGNED',
12 : 'SMALLINT_UNSIGNED',
13 : 'INT_UNSIGNED',
14 : 'BIGINT_UNSIGNED',
C_NULL: 0,
C_BOOL: 1,
C_TINYINT: 2,
C_SMALLINT: 3,
C_INT: 4,
C_BIGINT: 5,
C_FLOAT: 6,
C_DOUBLE: 7,
C_BINARY: 8,
C_TIMESTAMP: 9,
C_NCHAR: 10,
C_TINYINT_UNSIGNED: 11,
C_SMALLINT_UNSIGNED: 12,
C_INT_UNSIGNED: 13,
C_BIGINT_UNSIGNED: 14,
// NULL value definition
// NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL: 2,
C_TINYINT_NULL: -128,
C_TINYINT_UNSIGNED_NULL: 255,
C_SMALLINT_NULL: -32768,
C_SMALLINT_UNSIGNED_NULL: 65535,
C_INT_NULL: -2147483648,
C_INT_UNSIGNED_NULL: 4294967295,
C_BIGINT_NULL: -9223372036854775808n,
C_BIGINT_UNSIGNED_NULL: 18446744073709551615n,
C_FLOAT_NULL: 2146435072,
C_DOUBLE_NULL: -9223370937343148032,
C_NCHAR_NULL: 4294967295,
C_BINARY_NULL: 255,
C_TIMESTAMP_MILLI: 0,
C_TIMESTAMP_MICRO: 1,
getType,
SCHEMALESS_PROTOCOL,
SCHEMALESS_PRECISION
}
/**
* @function
* @param {number} typecode - The code to get the name of the type for
* @return {string} Name of the field type
*/
function getType(typecode) {
return typeCodesToName[typecode];
}
......@@ -211,7 +211,7 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
}
}
performance.mark('B');
performance.measure('query', 'A', 'B');
let response = this._createSetResponse(this._rowcount, time)
......@@ -474,3 +474,21 @@ TDengineCursor.prototype.openStream = function openStream(sql, callback, stime =
TDengineCursor.prototype.closeStream = function closeStream(stream) {
this._chandle.closeStream(stream);
}
/**
* schemaless insert
* @param {*} connection a valid database connection
* @param {*} lines string data, which statisfied with line proctocol
* @param {*} protocal Line protocol, enum type (0,1,2,3),indicate different line protocol
* @param {*} precision timestamp precision in lines, enum type (0,1,2,3,4,5,6)
* @returns TAOS_RES
*
*/
TDengineCursor.prototype.schemalessInsert = function schemalessInsert(lines, protocol, precision) {
this._result = this._chandle.schemalessInsert(this._connection._conn, lines, protocol, precision);
let errorNo = this._chandle.errno(this._result);
if (errorNo != 0) {
throw new errors.InterfaceError(errorNo + ":" + this._chandle.errStr(this._result));
this._chandle.freeResult(this._result);
}
this._chandle.freeResult(this._result);
}
......@@ -7,7 +7,7 @@
"test": "test"
},
"scripts": {
"test": "node test/test.js && node test/testMicroseconds.js && node test/testNanoseconds.js && node test/testUnsignedType.js "
"test": "node test/test.js && node test/testMicroseconds.js && node test/testNanoseconds.js && node test/testUnsignedType.js && node test/testSchemalessInsert.js "
},
"repository": {
"type": "git",
......@@ -27,6 +27,7 @@
"homepage": "https://github.com/taosdata/tdengine#readme",
"dependencies": {
"ffi-napi": "^3.1.0",
"lodash": "^4.17.21",
"ref-array-napi": "^1.2.1",
"ref-napi": "^1.5.2",
"ref-struct-napi": "^1.1.1"
......
var TDengineConnection = require('./nodetaos/connection.js')
module.exports.connect = function (connection={}) {
return new TDengineConnection(connection);
}
const TDengineConstant = require('./nodetaos/constants.js')
module.exports = {
connect: function (connection = {}) {
return new TDengineConnection(connection);
},
SCHEMALESS_PROTOCOL: TDengineConstant.SCHEMALESS_PROTOCOL,
SCHEMALESS_PRECISION: TDengineConstant.SCHEMALESS_PRECISION,
}
\ No newline at end of file
const _ = require('lodash');
const taos = require('../tdengine');
var conn = taos.connect({ host: "127.0.0.1", user: "root", password: "taosdata", config: "/etc/taos", port: 10 });
var c1 = conn.cursor();
executeUpdate("drop database if exists nodedb;");
executeUpdate("create database if not exists nodedb ;");
executeUpdate("use nodedb;");
let tbname1 = "line_protocol_arr";
let tbname2 = "json_protocol_arr";
let tbname3 = "json_protocol_str";
let tbname4 = "line_protocol_str";
let line1 = [tbname1 + ",t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
tbname1 + ",t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833641000000"
];
let line2 = ["{"
+ "\"metric\": \"" + tbname2 + "\","
+ "\"timestamp\": 1626006833,"
+ "\"value\": 10,"
+ "\"tags\": {"
+ " \"t1\": true,"
+ "\"t2\": false,"
+ "\"t3\": 10,"
+ "\"t4\": \"123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>\""
+ "}"
+ "}"
];
let line3 = "{"
+ "\"metric\": \"" + tbname3 + "\","
+ "\"timestamp\": 1626006833000,"
+ "\"value\": 10,"
+ "\"tags\": {"
+ " \"t1\": true,"
+ "\"t2\": false,"
+ "\"t3\": 10,"
+ "\"t4\": \"123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>\""
+ "}"
+ "}";
let line4 = tbname4 + ",t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639";
try {
c1.schemalessInsert(line1, taos.SCHEMALESS_PROTOCOL.TSDB_SML_LINE_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_NANO_SECONDS);
testSchemaless(tbname1, line1.length);
c1.schemalessInsert(line2, taos.SCHEMALESS_PROTOCOL.TSDB_SML_JSON_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_SECONDS);
testSchemaless(tbname2, line2.length);
c1.schemalessInsert(line3, taos.SCHEMALESS_PROTOCOL.TSDB_SML_JSON_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_MILLI_SECONDS);
testSchemaless(tbname3, 1);
c1.schemalessInsert(line4, taos.SCHEMALESS_PROTOCOL.TSDB_SML_LINE_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_MILLI_SECONDS);
testSchemaless(tbname4, 1);
} catch (err) {
console.log(err)
}
function executeUpdate(sql) {
console.log(sql);
c1.execute(sql);
}
function testSchemaless(tbname, numLines) {
let sql = "select count(*) from " + tbname + ";";
executeUpdate(sql);
let affectRows = _.first(c1.fetchall());
if (affectRows != numLines) {
console.log(1);
console.log(line2);
throw "protocol " + tbname + " schemaless insert success,but can't select as expect."
}
else {
console.log("protocol " + tbname + " schemaless insert success, can select as expect.")
}
console.log("===================")
}
setTimeout(() => conn.close(), 2000);
......@@ -585,6 +585,7 @@ extern int64_t g_totalChildTables;
extern int64_t g_actualChildTables;
extern SQueryMetaInfo g_queryInfo;
extern FILE * g_fpOfInsertResult;
extern bool g_fail;
#define min(a, b) (((a) < (b)) ? (a) : (b))
......
......@@ -808,7 +808,8 @@ int createDatabasesAndStables(char *command) {
static void *createTable(void *sarg) {
threadInfo * pThreadInfo = (threadInfo *)sarg;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
int32_t* code = calloc(1, sizeof(int32_t));
*code = -1;
setThreadName("createTable");
uint64_t lastPrintTime = taosGetTimestampMs();
......@@ -818,7 +819,7 @@ static void *createTable(void *sarg) {
pThreadInfo->buffer = calloc(1, buff_len);
if (NULL == pThreadInfo->buffer) {
errorPrint("%s", "failed to allocate memory\n");
return NULL;
goto create_table_end;
}
int len = 0;
......@@ -840,11 +841,10 @@ static void *createTable(void *sarg) {
batchNum++;
} else {
if (stbInfo == NULL) {
free(pThreadInfo->buffer);
errorPrint(
"%s() LN%d, use metric, but super table info is NULL\n",
__func__, __LINE__);
exit(EXIT_FAILURE);
goto create_table_end;
} else {
if (0 == len) {
batchNum = 0;
......@@ -856,14 +856,13 @@ static void *createTable(void *sarg) {
char *tagsValBuf = (char *)calloc(TSDB_MAX_SQL_LEN + 1, 1);
if (NULL == tagsValBuf) {
errorPrint("%s", "failed to allocate memory\n");
return NULL;
goto create_table_end;
}
if (0 == stbInfo->tagSource) {
if (generateTagValuesForStb(stbInfo, i, tagsValBuf)) {
tmfree(tagsValBuf);
tmfree(pThreadInfo->buffer);
exit(EXIT_FAILURE);
goto create_table_end;
}
} else {
snprintf(tagsValBuf, TSDB_MAX_SQL_LEN, "(%s)",
......@@ -895,7 +894,7 @@ static void *createTable(void *sarg) {
NO_INSERT_TYPE, false)) {
errorPrint("queryDbExec() failed. buffer:\n%s\n",
pThreadInfo->buffer);
free(pThreadInfo->buffer);
goto create_table_end;
return NULL;
}
pThreadInfo->tables_created += batchNum;
......@@ -913,11 +912,14 @@ static void *createTable(void *sarg) {
NO_INSERT_TYPE, false)) {
errorPrint("queryDbExec() failed. buffer:\n%s\n",
pThreadInfo->buffer);
goto create_table_end;
}
pThreadInfo->tables_created += batchNum;
}
free(pThreadInfo->buffer);
return NULL;
*code = 0;
create_table_end:
tmfree(pThreadInfo->buffer);
return code;
}
int startMultiThreadCreateChildTable(char *cols, int threads,
......@@ -976,7 +978,12 @@ int startMultiThreadCreateChildTable(char *cols, int threads,
}
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
void* result;
pthread_join(pids[i], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
}
for (int i = 0; i < threads; i++) {
......@@ -988,6 +995,9 @@ int startMultiThreadCreateChildTable(char *cols, int threads,
free(pids);
free(infos);
if (g_fail) {
return -1;
}
return 0;
}
......@@ -1579,7 +1589,8 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo,
uint32_t interlaceRows) {
debugPrint("[%d] %s() LN%d: ### stmt interlace write\n",
pThreadInfo->threadID, __func__, __LINE__);
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
int64_t insertRows;
int64_t timeStampStep;
uint64_t insert_interval;
......@@ -1644,7 +1655,7 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo,
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
return NULL;
goto free_of_interlace_stmt;
}
samplePos = pThreadInfo->samplePos;
......@@ -1777,16 +1788,17 @@ static void *syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo,
}
if (percentComplete < 100)
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
free_of_interlace_stmt:
*code = 0;
printStatPerThread(pThreadInfo);
return NULL;
free_of_interlace_stmt:
return code;
}
void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
debugPrint("[%d] %s() LN%d: ### interlace write\n", pThreadInfo->threadID,
__func__, __LINE__);
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
int64_t insertRows;
uint64_t maxSqlLen;
int64_t timeStampStep;
......@@ -1824,7 +1836,7 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) {
errorPrint("%s", "failed to allocate memory\n");
return NULL;
goto free_of_interlace;
}
pThreadInfo->totalInsertRows = 0;
......@@ -1874,8 +1886,7 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer);
return NULL;
goto free_of_interlace;
}
uint64_t oldRemainderLen = remainderBufLen;
......@@ -2017,22 +2028,23 @@ void *syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
}
if (percentComplete < 100)
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
*code = 0;
printStatPerThread(pThreadInfo);
free_of_interlace:
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
return code;
}
static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
uint32_t interlaceRows) {
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
debugPrint("[%d] %s() LN%d: ### interlace schemaless write\n",
pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows;
uint64_t maxSqlLen;
int64_t timeStampStep;
uint64_t insert_interval;
int32_t code = 0;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
......@@ -2072,7 +2084,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
smlList = (char **)calloc(pThreadInfo->ntables, sizeof(char *));
if (NULL == smlList) {
errorPrint("%s", "failed to allocate memory\n");
return NULL;
goto free_of_interlace_sml;
}
for (int t = 0; t < pThreadInfo->ntables; t++) {
......@@ -2081,8 +2093,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
errorPrint("%s", "failed to allocate memory\n");
goto free_smlheadlist_interlace_sml;
}
code = generateSmlConstPart(sml, stbInfo, pThreadInfo, t);
if (code) {
if (generateSmlConstPart(sml, stbInfo, pThreadInfo, t)) {
goto free_smlheadlist_interlace_sml;
}
smlList[t] = sml;
......@@ -2105,8 +2116,7 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
jsonArray = cJSON_CreateArray();
tagsList = cJSON_CreateArray();
for (int t = 0; t < pThreadInfo->ntables; t++) {
code = generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t);
if (code) {
if (generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t)) {
goto free_json_interlace_sml;
}
}
......@@ -2156,17 +2166,15 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
tagsList,
(int)(tableSeq - pThreadInfo->start_table_from)),
true);
code = generateSmlJsonCols(jsonArray, tag, stbInfo,
pThreadInfo, timestamp);
if (code) {
if (generateSmlJsonCols(jsonArray, tag, stbInfo,
pThreadInfo, timestamp)) {
goto free_json_interlace_sml;
}
} else {
code = generateSmlMutablePart(
pThreadInfo->lines[j],
smlList[tableSeq - pThreadInfo->start_table_from],
stbInfo, pThreadInfo, timestamp);
if (code) {
if (generateSmlMutablePart(
pThreadInfo->lines[j],
smlList[tableSeq - pThreadInfo->start_table_from],
stbInfo, pThreadInfo, timestamp)) {
goto free_lines_interlace_sml;
}
}
......@@ -2302,7 +2310,9 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
if (percentComplete < 100)
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
*code = 0;
printStatPerThread(pThreadInfo);
free_of_interlace_sml:
if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
tmfree(pThreadInfo->lines);
free_json_interlace_sml:
......@@ -2324,12 +2334,13 @@ static void *syncWriteInterlaceSml(threadInfo *pThreadInfo,
}
tmfree(smlList);
}
return NULL;
return code;
}
void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### stmt progressive write\n", __func__, __LINE__);
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
int64_t timeStampStep =
stbInfo ? stbInfo->timeStampStep : g_args.timestamp_step;
......@@ -2362,7 +2373,7 @@ void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) {
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
return NULL;
goto free_of_stmt_progressive;
}
// measure prepare + insert
......@@ -2448,16 +2459,17 @@ void *syncWriteProgressiveStmt(threadInfo *pThreadInfo) {
if (percentComplete < 100) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
}
*code = 0;
printStatPerThread(pThreadInfo);
free_of_stmt_progressive:
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
return code;
}
void *syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
int32_t* code = calloc(1, sizeof (int32_t));
*code = -1;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
uint64_t maxSqlLen = stbInfo ? stbInfo->maxSqlLen : g_args.max_sql_len;
int64_t timeStampStep =
......@@ -2469,7 +2481,7 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) {
errorPrint("%s", "failed to allocate memory\n");
return NULL;
goto free_of_progressive;
}
uint64_t lastPrintTime = taosGetTimestampMs();
......@@ -2497,8 +2509,7 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) {
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
free(pThreadInfo->buffer);
return NULL;
goto free_of_progressive;
}
int64_t remainderBufLen = maxSqlLen - 2000;
......@@ -2609,16 +2620,17 @@ void *syncWriteProgressive(threadInfo *pThreadInfo) {
if (percentComplete < 100) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
}
*code = 0;
printStatPerThread(pThreadInfo);
free_of_progressive:
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
return code;
}
void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### sml progressive write\n", __func__, __LINE__);
int32_t code = 0;
int32_t * code = calloc(1, sizeof (int32_t));
*code = -1;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
int64_t timeStampStep = stbInfo->timeStampStep;
int64_t insertRows = stbInfo->insertRows;
......@@ -2645,7 +2657,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
smlList = (char **)calloc(pThreadInfo->ntables, sizeof(char *));
if (NULL == smlList) {
errorPrint("%s", "failed to allocate memory\n");
return NULL;
goto free_of_progressive_sml;
}
for (int t = 0; t < pThreadInfo->ntables; t++) {
char *sml = (char *)calloc(1, stbInfo->lenOfOneRow);
......@@ -2653,8 +2665,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
errorPrint("%s", "failed to allocate memory\n");
goto free_smlheadlist_progressive_sml;
}
code = generateSmlConstPart(sml, stbInfo, pThreadInfo, t);
if (code) {
if (generateSmlConstPart(sml, stbInfo, pThreadInfo, t)) {
goto free_smlheadlist_progressive_sml;
}
smlList[t] = sml;
......@@ -2677,8 +2688,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
jsonArray = cJSON_CreateArray();
tagsList = cJSON_CreateArray();
for (int t = 0; t < pThreadInfo->ntables; t++) {
code = generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t);
if (code) {
if (generateSmlJsonTags(tagsList, stbInfo, pThreadInfo, t)) {
goto free_json_progressive_sml;
}
}
......@@ -2699,16 +2709,14 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
cJSON *tag = cJSON_Duplicate(
cJSON_GetArrayItem(tagsList, (int)i), true);
code = generateSmlJsonCols(jsonArray, tag, stbInfo,
pThreadInfo, timestamp);
if (code) {
if (generateSmlJsonCols(jsonArray, tag, stbInfo,
pThreadInfo, timestamp)) {
goto free_json_progressive_sml;
}
} else {
code = generateSmlMutablePart(pThreadInfo->lines[k],
smlList[i], stbInfo,
pThreadInfo, timestamp);
if (code) {
if (generateSmlMutablePart(pThreadInfo->lines[k],
smlList[i], stbInfo,
pThreadInfo, timestamp)) {
goto free_lines_progressive_sml;
}
}
......@@ -2770,6 +2778,8 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
}
}
*code = 0;
free_of_progressive_sml:
if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
tmfree(pThreadInfo->lines);
free_json_progressive_sml:
......@@ -2791,7 +2801,7 @@ void *syncWriteProgressiveSml(threadInfo *pThreadInfo) {
}
tmfree(smlList);
}
return NULL;
return code;
}
void *syncWrite(void *sarg) {
......@@ -3290,7 +3300,12 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision,
int64_t start = taosGetTimestampUs();
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
void* result;
pthread_join(pids[i], &result);
if (*(int32_t*)result){
g_fail = true;
}
tmfree(result);
}
uint64_t totalDelay = 0;
......@@ -3343,6 +3358,13 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision,
if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay;
}
free(pids);
free(infos);
if (g_fail){
return -1;
}
if (cntDelay == 0) cntDelay = 1;
avgDelay = (double)totalDelay / cntDelay;
......@@ -3404,8 +3426,6 @@ int startMultiThreadInsertData(int threads, char *db_name, char *precision,
// taos_close(taos);
free(pids);
free(infos);
return 0;
}
......
......@@ -20,6 +20,7 @@ FILE * g_fpOfInsertResult = NULL;
char * g_dupstr = NULL;
SDbs g_Dbs;
SQueryMetaInfo g_queryInfo;
bool g_fail = false;
SArguments g_args = {
DEFAULT_METAFILE, // metaFile
......
......@@ -44,7 +44,8 @@ void selectAndGetResult(threadInfo *pThreadInfo, char *command) {
void *specifiedTableQuery(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
int32_t *code = calloc(1, sizeof (int32_t));
*code = -1;
setThreadName("specTableQuery");
if (pThreadInfo->taos == NULL) {
......@@ -54,7 +55,7 @@ void *specifiedTableQuery(void *sarg) {
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
goto end_of_specified_query;
} else {
pThreadInfo->taos = taos;
}
......@@ -65,7 +66,7 @@ void *specifiedTableQuery(void *sarg) {
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
errorPrint("use database %s failed!\n\n", g_queryInfo.dbName);
return NULL;
goto end_of_specified_query;
}
uint64_t st = 0;
......@@ -118,14 +119,18 @@ void *specifiedTableQuery(void *sarg) {
lastPrintTime = currentPrintTime;
}
}
return NULL;
*code = 0;
end_of_specified_query:
return code;
}
void *superTableQuery(void *sarg) {
int32_t * code = calloc(1, sizeof (int32_t));
*code = -1;
char *sqlstr = calloc(1, BUFFER_SIZE);
if (NULL == sqlstr) {
errorPrint("%s", "failed to allocate memory\n");
return NULL;
goto free_of_super_query;
}
threadInfo *pThreadInfo = (threadInfo *)sarg;
......@@ -139,8 +144,7 @@ void *superTableQuery(void *sarg) {
if (taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
free(sqlstr);
return NULL;
goto free_of_super_query;
} else {
pThreadInfo->taos = taos;
}
......@@ -200,9 +204,10 @@ void *superTableQuery(void *sarg) {
taosGetSelfPthreadId(), pThreadInfo->start_table_from,
pThreadInfo->end_table_to, (double)(et - st) / 1000.0);
}
free(sqlstr);
return NULL;
*code = 0;
free_of_super_query:
tmfree(sqlstr);
return code;
}
int queryTestProcess() {
......@@ -398,7 +403,12 @@ int queryTestProcess() {
if ((nSqlCount > 0) && (nConcurrent > 0)) {
for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL);
void* result;
pthread_join(pids[i * nSqlCount + j], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infos + i * nSqlCount + j;
#ifdef WINDOWS
......@@ -416,7 +426,12 @@ int queryTestProcess() {
tmfree((char *)infos);
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
void* result;
pthread_join(pidsOfSub[i], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", 4)) {
threadInfo *pThreadInfo = infosOfSub + i;
#ifdef WINDOWS
......@@ -431,6 +446,10 @@ int queryTestProcess() {
tmfree((char *)pidsOfSub);
tmfree((char *)infosOfSub);
if (g_fail) {
return -1;
}
// taos_close(taos);// workaround to use separate taos connection;
uint64_t endTs = taosGetTimestampMs();
......
......@@ -71,6 +71,8 @@ TAOS_SUB *subscribeImpl(QUERY_CLASS class, threadInfo *pThreadInfo, char *sql,
}
void *specifiedSubscribe(void *sarg) {
int32_t * code = calloc(1, sizeof (int32_t));
*code = -1;
threadInfo *pThreadInfo = (threadInfo *)sarg;
// TAOS_SUB* tsub = NULL;
......@@ -83,15 +85,14 @@ void *specifiedSubscribe(void *sarg) {
if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
goto free_of_specified_subscribe;
}
}
char sqlStr[TSDB_DB_NAME_LEN + 5];
sprintf(sqlStr, "USE %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
return NULL;
goto free_of_specified_subscribe;
}
sprintf(g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
......@@ -110,8 +111,7 @@ void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
goto free_of_specified_subscribe;
}
// start loop to consume result
......@@ -171,36 +171,37 @@ void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == g_queryInfo.specifiedQueryInfo
.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
goto free_of_specified_subscribe;
}
}
}
}
*code = 0;
taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]);
free_of_specified_subscribe:
taos_close(pThreadInfo->taos);
return NULL;
return code;
}
static void *superSubscribe(void *sarg) {
int32_t * code = calloc(1, sizeof (int32_t));
*code = -1;
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT] = {0};
uint64_t tsubSeq;
char * subSqlStr = calloc(1, BUFFER_SIZE);
if (NULL == subSqlStr) {
errorPrint("%s", "failed to allocate memory\n");
goto free_of_super_subscribe;
}
TAOS_SUB *tsub[MAX_QUERY_SQL_COUNT] = {0};
uint64_t tsubSeq;
setThreadName("superSub");
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
free(subSqlStr);
errorPrint("The table number(%" PRId64
") of the thread is more than max query sql count: %d\n",
pThreadInfo->ntables, MAX_QUERY_SQL_COUNT);
exit(EXIT_FAILURE);
goto free_of_super_subscribe;
}
if (pThreadInfo->taos == NULL) {
......@@ -210,18 +211,15 @@ static void *superSubscribe(void *sarg) {
if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
free(subSqlStr);
return NULL;
goto free_of_super_subscribe;
}
}
char sqlStr[TSDB_DB_NAME_LEN + 5];
sprintf(sqlStr, "USE %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
errorPrint("use database %s failed!\n\n", g_queryInfo.dbName);
free(subSqlStr);
return NULL;
goto free_of_super_subscribe;
}
char topic[32] = {0};
......@@ -252,9 +250,7 @@ static void *superSubscribe(void *sarg) {
g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos);
free(subSqlStr);
return NULL;
goto free_of_super_subscribe;
}
}
......@@ -321,9 +317,7 @@ static void *superSubscribe(void *sarg) {
g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos);
free(subSqlStr);
return NULL;
goto free_of_super_subscribe;
}
}
}
......@@ -340,10 +334,11 @@ static void *superSubscribe(void *sarg) {
tsubSeq = i - pThreadInfo->start_table_from;
taos_unsubscribe(tsub[tsubSeq], 0);
}
*code = 0;
free_of_super_subscribe:
taos_close(pThreadInfo->taos);
free(subSqlStr);
return NULL;
tmfree(subSqlStr);
return code;
}
int subscribeTestProcess() {
......@@ -482,7 +477,12 @@ int subscribeTestProcess() {
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
pthread_join(pidsOfStable[seq], NULL);
void* result;
pthread_join(pidsOfStable[seq], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
}
}
}
......@@ -491,7 +491,12 @@ int subscribeTestProcess() {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
pthread_join(pids[seq], NULL);
void* result;
pthread_join(pids[seq], &result);
if (*(int32_t*)result) {
g_fail = true;
}
tmfree(result);
}
}
......@@ -501,5 +506,8 @@ int subscribeTestProcess() {
tmfree((char *)pidsOfStable);
tmfree((char *)infosOfStable);
// taos_close(taos);
if (g_fail) {
return -1;
}
return 0;
}
\ No newline at end of file
......@@ -714,13 +714,17 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo,
STSchema **ppSchema1, STSchema **ppSchema2,
STable* pTable, int32_t* pPoints, SMemRow* pLastRow) {
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) {
(*pPoints)++;
return NULL;
}
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
"updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
memRowKey(row1));
if(row2 == NULL || pRepo->config.update != TD_ROW_PARTIAL_UPDATE) {
void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1));
if(pMem == NULL) return NULL;
......@@ -841,7 +845,6 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
(*pAffectedRows) += points;
if(lastRow != NULL) {
TSKEY lastRowKey = memRowKey(lastRow);
if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey;
......
......@@ -11,7 +11,7 @@
4. pip install ../src/connector/python ; pip3 install
../src/connector/python
5. pip install numpy; pip3 install numpy (numpy is required only if you need to run querySort.py)
5. pip install numpy; pip3 install numpy fabric2 psutil pandas(numpy is required only if you need to run querySort.py)
> Note: Both Python2 and Python3 are currently supported by the Python test
> framework. Since Python2 is no longer officially supported by Python Software
......
......@@ -99,26 +99,41 @@ class TDTestCase:
else:
shutil.rmtree("./dumpdata")
os.mkdir("./dumpdata")
os.system(build_path + "/" + "taosdump -D test -o ./dumpdata")
sleep(2)
os.system("cd ./dumpdata && mv dbs.sql tables.sql")
os.system('sed -i "s/test/dumptest/g" `grep test -rl ./dumpdata`')
os.system(build_path + "/" + "taos -D ./dumpdata")
tdSql.query("select count(*) from dumptest.st")
tdSql.checkData(0, 0, 50)
tdLog.info("========test other file name about tables.sql========")
os.system("rm -rf ./dumpdata/*")
os.system(build_path + "/" + "taosdump -D test -o ./dumpdata")
sleep(2)
os.system("cd ./dumpdata && mv dbs.sql table.sql")
os.system('sed -i "s/test/tt/g" `grep test -rl ./dumpdata`')
# write data into sqls file
tables = ["CREATE DATABASE IF NOT EXISTS opendbtest REPLICA 1 QUORUM 1 DAYS\
10 KEEP 3650 CACHE 16 BLOCKS 16 MINROWS 100 MAXROWS 4096 FSYNC 3000 CACHELAST 0 COMP 2 PRECISION 'ms' UPDATE 0;",
"CREATE TABLE IF NOT EXISTS opendbtest.cpus (ts TIMESTAMP, value DOUBLE) TAGS (author NCHAR(2), \
department NCHAR(7), env NCHAR(4), hostname NCHAR(5), os NCHAR(6), production NCHAR(8), \
team NCHAR(7), type NCHAR(10), useage NCHAR(7), workflow NCHAR(4));"]
with open("./dumpdata/tables.sql" ,"a") as f :
for item in tables:
f.write(item)
f.write("\n")
f.close()
records = [ "CREATE TABLE IF NOT EXISTS opendbtest.tb USING opendbtest.cpus TAGS ('dd', 'Beijing', 'test', 'vm_7', 'ubuntu', 'taosdata', 'develop', 'usage_user', 'monitor', 'TIMI');",
"INSERT INTO opendbtest.tb VALUES (1420070400000, 59.078475);",
"INSERT INTO opendbtest.tb VALUES (1420070410000, 44.844490);",
"INSERT INTO opendbtest.tb VALUES (1420070420000, 34.796703);",
"INSERT INTO opendbtest.tb VALUES (1420070430000, 35.758099);",
"INSERT INTO opendbtest.tb VALUES (1420070440000, 51.502387);"]
with open("./dumpdata/opendbtest.0.sql" ,"a") as f :
for item in records:
f.write(item)
f.write("\n")
f.close()
cmd = build_path + "/" + "taos -D ./dumpdata"
out = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,stderr=subprocess.PIPE).stderr.read().decode("utf-8")
if out.find("error:") >=0:
print("===========expected error occured======")
tdSql.query("select value from opendbtest.tb")
tdSql.checkRows(5)
tdLog.info("====== check taos shell params ========")
......@@ -162,6 +177,7 @@ class TDTestCase:
continue
else:
cmd = build_path + "/" + "taos -s \" insert into dbst.tb2 values(now ,2,2.0,'"+code+"','汉字"+code+"\')\""
print(cmd)
self.execute_cmd(cmd)
......@@ -192,6 +208,7 @@ class TDTestCase:
for query in querys:
cmd = build_path + "/" + "taos -s \""+query+"\""
self.execute_cmd(cmd)
print(cmd)
def stop(self):
tdSql.close()
......
......@@ -182,7 +182,7 @@ python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoIns
python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoQuery.py
python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanosubscribe.py
python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestInsertTime_step.py
# disable due to taosdump don't support sql format. python3 test.py -f tools/taosdumpTestNanoSupport.py
python3 test.py -f tools/taosdumpTestNanoSupport.py
#
python3 ./test.py -f tsdb/tsdbComp.py
......@@ -305,7 +305,7 @@ python3 ./test.py -f client/client.py
python3 ./test.py -f client/version.py
python3 ./test.py -f client/alterDatabase.py
python3 ./test.py -f client/noConnectionErrorTest.py
# disable due to taosdump don't support sql format. python3 ./test.py -f client/taoshellCheckCase.py
python3 ./test.py -f client/taoshellCheckCase.py
# python3 test.py -f client/change_time_1_1.py
# python3 test.py -f client/change_time_1_2.py
......
......@@ -171,8 +171,8 @@ class TDTestCase:
#print("==============taosdemo——json_no,#create stable,table; insert table; show table; select table; drop table")
assert os.system("%staosdemo -f tools/taosdemoAllTest/TD-10539/create_taosdemo_no.json -y " % binPath) == 0
tdSql.query("show dbno.tables ")
os.system("%staosdemo -f tools/taosdemoAllTest/TD-10539/create_taosdemo_no.json -y " % binPath)
tdSql.query("show dbno.tables;")
tdSql.checkRows(0)
......
......@@ -143,59 +143,41 @@ class TDTestCase:
'%staosdump --databases timedb1 -S 1625068810000000000 -o ./taosdumptest/dumptmp3 ' %
binPath)
# replace strings to dump in databases
os.system(
"sed -i \"s/timedb1/dumptmp1/g\" `grep timedb1 -rl ./taosdumptest/dumptmp1`")
os.system(
"sed -i \"s/timedb1/dumptmp2/g\" `grep timedb1 -rl ./taosdumptest/dumptmp2`")
os.system(
"sed -i \"s/timedb1/dumptmp3/g\" `grep timedb1 -rl ./taosdumptest/dumptmp3`")
os.system("%staosdump -i ./taosdumptest/dumptmp1" % binPath)
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp2" % binPath)
os.system("%staosdump -i ./taosdumptest/dumptmp3" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from dumptmp1.st")
tdSql.checkData(0, 0, 1000)
tdSql.query("select count(*) from dumptmp2.st")
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 510)
tdSql.query("select count(*) from dumptmp3.st")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp3" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 900)
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 1000)
# check data
origin_res = tdSql.getResult("select * from timedb1.st")
dump_res = tdSql.getResult("select * from dumptmp1.st")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
dump_res = tdSql.getResult("select * from timedb1.st")
if origin_res == dump_res:
tdLog.info("test nano second : dump check data pass for all data!")
else:
tdLog.info(
"test nano second : dump check data failed for all data!")
origin_res = tdSql.getResult(
"select * from timedb1.st where ts >=1625068810000000000 and ts <= 1625068860000000000")
dump_res = tdSql.getResult("select * from dumptmp2.st")
if origin_res == dump_res:
tdLog.info(" test nano second : dump check data pass for data! ")
else:
tdLog.info(" test nano second : dump check data failed for data !")
origin_res = tdSql.getResult(
"select * from timedb1.st where ts >=1625068810000000000 ")
dump_res = tdSql.getResult("select * from dumptmp3.st")
if origin_res == dump_res:
tdLog.info(" test nano second : dump check data pass for data! ")
else:
tdLog.info(" test nano second : dump check data failed for data !")
# us second support test case
os.system("rm -rf ./taosdumptest/")
tdSql.execute("drop database if exists dumptmp1")
tdSql.execute("drop database if exists dumptmp2")
tdSql.execute("drop database if exists dumptmp3")
tdSql.execute("drop database if exists timedb1")
if not os.path.exists("./taosdumptest/tmp1"):
os.makedirs("./taosdumptest/dumptmp1")
......@@ -228,55 +210,44 @@ class TDTestCase:
'%staosdump --databases timedb1 -S 1625068810000000 -o ./taosdumptest/dumptmp3 ' %
binPath)
os.system(
"sed -i \"s/timedb1/dumptmp1/g\" `grep timedb1 -rl ./taosdumptest/dumptmp1`")
os.system(
"sed -i \"s/timedb1/dumptmp2/g\" `grep timedb1 -rl ./taosdumptest/dumptmp2`")
os.system(
"sed -i \"s/timedb1/dumptmp3/g\" `grep timedb1 -rl ./taosdumptest/dumptmp3`")
os.system("%staosdump -i ./taosdumptest/dumptmp1" % binPath)
os.system("%staosdump -i ./taosdumptest/dumptmp2" % binPath)
os.system("%staosdump -i ./taosdumptest/dumptmp3" % binPath)
tdSql.query("select count(*) from dumptmp1.st")
tdSql.checkData(0, 0, 1000)
tdSql.query("select count(*) from dumptmp2.st")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp2" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 510)
tdSql.query("select count(*) from dumptmp3.st")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp3" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 900)
origin_res = tdSql.getResult("select * from timedb1.st")
dump_res = tdSql.getResult("select * from dumptmp1.st")
if origin_res == dump_res:
tdLog.info("test us second : dump check data pass for all data!")
else:
tdLog.info("test us second : dump check data failed for all data!")
origin_res = tdSql.getResult(
"select * from timedb1.st where ts >=1625068810000000 and ts <= 1625068860000000")
dump_res = tdSql.getResult("select * from dumptmp2.st")
if origin_res == dump_res:
tdLog.info(" test us second : dump check data pass for data! ")
else:
tdLog.info(" test us second : dump check data failed for data!")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 1000)
origin_res = tdSql.getResult(
"select * from timedb1.st where ts >=1625068810000000 ")
dump_res = tdSql.getResult("select * from dumptmp3.st")
# check data
origin_res = tdSql.getResult("select * from timedb1.st")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
dump_res = tdSql.getResult("select * from timedb1.st")
if origin_res == dump_res:
tdLog.info(" test us second : dump check data pass for data! ")
tdLog.info("test micro second : dump check data pass for all data!")
else:
tdLog.info(" test us second : dump check data failed for data! ")
tdLog.info(
"test micro second : dump check data failed for all data!")
# ms second support test case
os.system("rm -rf ./taosdumptest/")
tdSql.execute("drop database if exists dumptmp1")
tdSql.execute("drop database if exists dumptmp2")
tdSql.execute("drop database if exists dumptmp3")
tdSql.execute("drop database if exists timedb1")
if not os.path.exists("./taosdumptest/tmp1"):
os.makedirs("./taosdumptest/dumptmp1")
......@@ -309,48 +280,39 @@ class TDTestCase:
'%staosdump --databases timedb1 -S 1625068810000 -o ./taosdumptest/dumptmp3 ' %
binPath)
os.system(
"sed -i \"s/timedb1/dumptmp1/g\" `grep timedb1 -rl ./taosdumptest/dumptmp1`")
os.system(
"sed -i \"s/timedb1/dumptmp2/g\" `grep timedb1 -rl ./taosdumptest/dumptmp2`")
os.system(
"sed -i \"s/timedb1/dumptmp3/g\" `grep timedb1 -rl ./taosdumptest/dumptmp3`")
os.system("%staosdump -i ./taosdumptest/dumptmp1" % binPath)
os.system("%staosdump -i ./taosdumptest/dumptmp2" % binPath)
os.system("%staosdump -i ./taosdumptest/dumptmp3" % binPath)
tdSql.query("select count(*) from dumptmp1.st")
tdSql.checkData(0, 0, 1000)
tdSql.query("select count(*) from dumptmp2.st")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp2" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 510)
tdSql.query("select count(*) from dumptmp3.st")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp3" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 900)
origin_res = tdSql.getResult("select * from timedb1.st")
dump_res = tdSql.getResult("select * from dumptmp1.st")
if origin_res == dump_res:
tdLog.info("test ms second : dump check data pass for all data!")
else:
tdLog.info("test ms second : dump check data failed for all data!")
origin_res = tdSql.getResult(
"select * from timedb1.st where ts >=1625068810000 and ts <= 1625068860000")
dump_res = tdSql.getResult("select * from dumptmp2.st")
if origin_res == dump_res:
tdLog.info(" test ms second : dump check data pass for data! ")
else:
tdLog.info(" test ms second : dump check data failed for data!")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 1000)
origin_res = tdSql.getResult(
"select * from timedb1.st where ts >=1625068810000 ")
dump_res = tdSql.getResult("select * from dumptmp3.st")
# check data
origin_res = tdSql.getResult("select * from timedb1.st")
tdSql.execute("drop database timedb1")
os.system("%staosdump -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
dump_res = tdSql.getResult("select * from timedb1.st")
if origin_res == dump_res:
tdLog.info(" test ms second : dump check data pass for data! ")
tdLog.info("test million second : dump check data pass for all data!")
else:
tdLog.info(" test ms second : dump check data failed for data! ")
tdLog.info(
"test million second : dump check data failed for all data!")
os.system("rm -rf ./taosdumptest/")
os.system("rm -rf ./dump_result.txt")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册