未验证 提交 f2e2820c 编写于 作者: X xiaolei li 提交者: GitHub

Node schemaless/xiaolei/td 10842 nodejs support schemaless (#8597)

* [TD-10842]<feature>:nodejs support schemaless

* [TD-10842]<fix>: fix nodejs connector schemaless API error

* [TD-10842]<feature>:nodejs support schemaless

* fix require mistake

* fix code style constant.js
Co-authored-by: NHuo Linhe <linhehuo@gmail.com>
上级 c468edcc
......@@ -10,9 +10,8 @@ const ArrayType = require('ref-array-napi');
const Struct = require('ref-struct-napi');
const FieldTypes = require('./constants');
const errors = require('./error');
const _ = require('lodash')
const TaosObjects = require('./taosobjects');
const { NULL_POINTER } = require('ref-napi');
const { Console } = require('console');
module.exports = CTaosInterface;
......@@ -223,6 +222,8 @@ TaosField.fields.name.type.size = 65;
TaosField.defineProperty('type', ref.types.char);
TaosField.defineProperty('bytes', ref.types.short);
//define schemaless line array
var smlLine = ArrayType(ref.coerceType('char *'))
/**
*
......@@ -238,7 +239,6 @@ function CTaosInterface(config = null, pass = false) {
ref.types.void_ptr2 = ref.refType(ref.types.void_ptr);
/*Declare a bunch of functions first*/
/* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS * */
if ('win32' == os.platform()) {
taoslibname = 'taos';
} else {
......@@ -303,9 +303,15 @@ function CTaosInterface(config = null, pass = false) {
// int64_t stime, void *param, void (*callback)(void *));
'taos_open_stream': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr]],
//void taos_close_stream(TAOS_STREAM *tstr);
'taos_close_stream': [ref.types.void, [ref.types.void_ptr]]
'taos_close_stream': [ref.types.void, [ref.types.void_ptr]],
//Schemaless insert
//TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol,int precision)
// 'taos_schemaless_insert': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.int, ref.types.int, ref.types.int]]
'taos_schemaless_insert': [ref.types.void_ptr, [ref.types.void_ptr, smlLine, 'int', 'int', 'int']]
});
if (pass == false) {
if (config == null) {
this._config = ref.alloc(ref.types.char_ptr, ref.NULL);
......@@ -664,3 +670,38 @@ CTaosInterface.prototype.closeStream = function closeStream(stream) {
this.libtaos.taos_close_stream(stream);
console.log("Closed stream");
}
//Schemaless insert API
/**
* TAOS* taos, char* lines[], int numLines, int protocol,int precision)
* using taos_errstr get error info, taos_errno get error code. Remmember
* to release taos_res, otherwile will lead memory leak.
* TAOS schemaless insert api
* @param {*} connection a valid database connection
* @param {*} lines string data, which statisfied with line proctocol
* @param {*} numLines number of rows in param lines.
* @param {*} protocal Line protocol, enum type (0,1,2,3),indicate different line protocol
* @param {*} precision timestamp precision in lines, enum type (0,1,2,3,4,5,6)
* @returns TAOS_RES
*
*/
CTaosInterface.prototype.schemalessInsert = function schemalessInsert(connection,lines, protocal, precision) {
let _numLines = null;
let _lines = null;
if(_.isString(lines)){
_numLines = 1;
_lines = Buffer.alloc(_numLines * ref.sizeof.pointer);
ref.set(_lines,0,ref.allocCString(lines),ref.types.char_ptr);
}
else if(_.isArray(lines)){
_numLines = lines.length;
_lines = Buffer.alloc(_numLines * ref.sizeof.pointer);
for(let i = 0; i < _numLines ; i++){
ref.set(_lines,i*ref.sizeof.pointer,ref.allocCString(lines[i]),ref.types.char_ptr)
}
}
else{
throw new errors.InterfaceError("Unsupport lines input")
}
return this.libtaos.taos_schemaless_insert(connection, _lines, _numLines, protocal, precision);
}
const SCHEMALESS_PROTOCOL = {
TSDB_SML_UNKNOWN_PROTOCOL: 0,
TSDB_SML_LINE_PROTOCOL: 1,
TSDB_SML_TELNET_PROTOCOL: 2,
TSDB_SML_JSON_PROTOCOL: 3
}
const SCHEMALESS_PRECISION = {
TSDB_SML_TIMESTAMP_NOT_CONFIGURED : 0,
TSDB_SML_TIMESTAMP_HOURS : 1,
TSDB_SML_TIMESTAMP_MINUTES : 2,
TSDB_SML_TIMESTAMP_SECONDS : 3,
TSDB_SML_TIMESTAMP_MILLI_SECONDS : 4,
TSDB_SML_TIMESTAMP_MICRO_SECONDS : 5,
TSDB_SML_TIMESTAMP_NANO_SECONDS : 6
}
const typeCodesToName = {
0: 'Null',
1: 'Boolean',
2: 'Tiny Int',
3: 'Small Int',
4: 'Int',
5: 'Big Int',
6: 'Float',
7: 'Double',
8: 'Binary',
9: 'Timestamp',
10: 'Nchar',
11: 'Tinyint Unsigned',
12: 'Smallint Unsigned',
13: 'Int Unsigned',
14: 'Bigint Unsigned',
}
/**
* @function
* @param {number} typecode - The code to get the name of the type for
* @return {string} Name of the field type
*/
function getType(typecode) {
return typeCodesToName[typecode];
}
/**
* Contains the the definitions/values assigned to various field types
* @module FieldTypes
......@@ -19,70 +61,44 @@
1970-01-01 08:00:00.000 GMT.
* @property {number} C_NCHAR - NChar field type encoded in ASCII, a wide string.
*
*
*
* @property {number} C_TIMESTAMP_MILLI - The code for millisecond timestamps, as returned by libtaos.taos_result_precision(result).
* @property {number} C_TIMESTAMP_MICRO - The code for microsecond timestamps, as returned by libtaos.taos_result_precision(result).
*/
module.exports = {
C_NULL : 0,
C_BOOL : 1,
C_TINYINT : 2,
C_SMALLINT : 3,
C_INT : 4,
C_BIGINT : 5,
C_FLOAT : 6,
C_DOUBLE : 7,
C_BINARY : 8,
C_TIMESTAMP : 9,
C_NCHAR : 10,
C_TINYINT_UNSIGNED : 11,
C_SMALLINT_UNSIGNED : 12,
C_INT_UNSIGNED : 13,
C_BIGINT_UNSIGNED : 14,
C_NULL: 0,
C_BOOL: 1,
C_TINYINT: 2,
C_SMALLINT: 3,
C_INT: 4,
C_BIGINT: 5,
C_FLOAT: 6,
C_DOUBLE: 7,
C_BINARY: 8,
C_TIMESTAMP: 9,
C_NCHAR: 10,
C_TINYINT_UNSIGNED: 11,
C_SMALLINT_UNSIGNED: 12,
C_INT_UNSIGNED: 13,
C_BIGINT_UNSIGNED: 14,
// NULL value definition
// NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL : 2,
C_TINYINT_NULL : -128,
C_TINYINT_UNSIGNED_NULL : 255,
C_SMALLINT_NULL : -32768,
C_SMALLINT_UNSIGNED_NULL : 65535,
C_INT_NULL : -2147483648,
C_INT_UNSIGNED_NULL : 4294967295,
C_BIGINT_NULL : -9223372036854775808n,
C_BIGINT_UNSIGNED_NULL : 18446744073709551615n,
C_FLOAT_NULL : 2146435072,
C_DOUBLE_NULL : -9223370937343148032,
C_NCHAR_NULL : 4294967295,
C_BINARY_NULL : 255,
C_TIMESTAMP_MILLI : 0,
C_TIMESTAMP_MICRO : 1,
C_BOOL_NULL: 2,
C_TINYINT_NULL: -128,
C_TINYINT_UNSIGNED_NULL: 255,
C_SMALLINT_NULL: -32768,
C_SMALLINT_UNSIGNED_NULL: 65535,
C_INT_NULL: -2147483648,
C_INT_UNSIGNED_NULL: 4294967295,
C_BIGINT_NULL: -9223372036854775808n,
C_BIGINT_UNSIGNED_NULL: 18446744073709551615n,
C_FLOAT_NULL: 2146435072,
C_DOUBLE_NULL: -9223370937343148032,
C_NCHAR_NULL: 4294967295,
C_BINARY_NULL: 255,
C_TIMESTAMP_MILLI: 0,
C_TIMESTAMP_MICRO: 1,
getType,
SCHEMALESS_PROTOCOL,
SCHEMALESS_PRECISION
}
const typeCodesToName = {
0 : 'Null',
1 : 'Boolean',
2 : 'Tiny Int',
3 : 'Small Int',
4 : 'Int',
5 : 'Big Int',
6 : 'Float',
7 : 'Double',
8 : 'Binary',
9 : 'Timestamp',
10 : 'Nchar',
11 : 'TINYINT_UNSIGNED',
12 : 'SMALLINT_UNSIGNED',
13 : 'INT_UNSIGNED',
14 : 'BIGINT_UNSIGNED',
}
/**
* @function
* @param {number} typecode - The code to get the name of the type for
* @return {string} Name of the field type
*/
function getType(typecode) {
return typeCodesToName[typecode];
}
......@@ -474,3 +474,21 @@ TDengineCursor.prototype.openStream = function openStream(sql, callback, stime =
TDengineCursor.prototype.closeStream = function closeStream(stream) {
this._chandle.closeStream(stream);
}
/**
* schemaless insert
* @param {*} connection a valid database connection
* @param {*} lines string data, which statisfied with line proctocol
* @param {*} protocal Line protocol, enum type (0,1,2,3),indicate different line protocol
* @param {*} precision timestamp precision in lines, enum type (0,1,2,3,4,5,6)
* @returns TAOS_RES
*
*/
TDengineCursor.prototype.schemalessInsert = function schemalessInsert(lines, protocol, precision) {
this._result = this._chandle.schemalessInsert(this._connection._conn, lines, protocol, precision);
let errorNo = this._chandle.errno(this._result);
if (errorNo != 0) {
throw new errors.InterfaceError(errorNo + ":" + this._chandle.errStr(this._result));
this._chandle.freeResult(this._result);
}
this._chandle.freeResult(this._result);
}
......@@ -7,7 +7,7 @@
"test": "test"
},
"scripts": {
"test": "node test/test.js && node test/testMicroseconds.js && node test/testNanoseconds.js && node test/testUnsignedType.js "
"test": "node test/test.js && node test/testMicroseconds.js && node test/testNanoseconds.js && node test/testUnsignedType.js && node test/testSchemalessInsert.js "
},
"repository": {
"type": "git",
......@@ -27,6 +27,7 @@
"homepage": "https://github.com/taosdata/tdengine#readme",
"dependencies": {
"ffi-napi": "^3.1.0",
"lodash": "^4.17.21",
"ref-array-napi": "^1.2.1",
"ref-napi": "^1.5.2",
"ref-struct-napi": "^1.1.1"
......
var TDengineConnection = require('./nodetaos/connection.js')
module.exports.connect = function (connection={}) {
const TDengineConstant = require('./nodetaos/constants.js')
module.exports = {
connect: function (connection = {}) {
return new TDengineConnection(connection);
},
SCHEMALESS_PROTOCOL: TDengineConstant.SCHEMALESS_PROTOCOL,
SCHEMALESS_PRECISION: TDengineConstant.SCHEMALESS_PRECISION,
}
\ No newline at end of file
const _ = require('lodash');
const taos = require('../tdengine');
var conn = taos.connect({ host: "127.0.0.1", user: "root", password: "taosdata", config: "/etc/taos", port: 10 });
var c1 = conn.cursor();
executeUpdate("drop database if exists nodedb;");
executeUpdate("create database if not exists nodedb ;");
executeUpdate("use nodedb;");
let tbname1 = "line_protocol_arr";
let tbname2 = "json_protocol_arr";
let tbname3 = "json_protocol_str";
let tbname4 = "line_protocol_str";
let line1 = [tbname1 + ",t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000",
tbname1 + ",t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833641000000"
];
let line2 = ["{"
+ "\"metric\": \"" + tbname2 + "\","
+ "\"timestamp\": 1626006833,"
+ "\"value\": 10,"
+ "\"tags\": {"
+ " \"t1\": true,"
+ "\"t2\": false,"
+ "\"t3\": 10,"
+ "\"t4\": \"123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>\""
+ "}"
+ "}"
];
let line3 = "{"
+ "\"metric\": \"" + tbname3 + "\","
+ "\"timestamp\": 1626006833000,"
+ "\"value\": 10,"
+ "\"tags\": {"
+ " \"t1\": true,"
+ "\"t2\": false,"
+ "\"t3\": 10,"
+ "\"t4\": \"123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>\""
+ "}"
+ "}";
let line4 = tbname4 + ",t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639";
try {
c1.schemalessInsert(line1, taos.SCHEMALESS_PROTOCOL.TSDB_SML_LINE_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_NANO_SECONDS);
testSchemaless(tbname1, line1.length);
c1.schemalessInsert(line2, taos.SCHEMALESS_PROTOCOL.TSDB_SML_JSON_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_SECONDS);
testSchemaless(tbname2, line2.length);
c1.schemalessInsert(line3, taos.SCHEMALESS_PROTOCOL.TSDB_SML_JSON_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_MILLI_SECONDS);
testSchemaless(tbname3, 1);
c1.schemalessInsert(line4, taos.SCHEMALESS_PROTOCOL.TSDB_SML_LINE_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_MILLI_SECONDS);
testSchemaless(tbname4, 1);
} catch (err) {
console.log(err)
}
function executeUpdate(sql) {
console.log(sql);
c1.execute(sql);
}
function testSchemaless(tbname, numLines) {
let sql = "select count(*) from " + tbname + ";";
executeUpdate(sql);
let affectRows = _.first(c1.fetchall());
if (affectRows != numLines) {
console.log(1);
console.log(line2);
throw "protocol " + tbname + " schemaless insert success,but can't select as expect."
}
else {
console.log("protocol " + tbname + " schemaless insert success, can select as expect.")
}
console.log("===================")
}
setTimeout(() => conn.close(), 2000);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册