From f2e2820c13eba65402f005a3b578e5190d6a645e Mon Sep 17 00:00:00 2001 From: xiaolei li <85657333+xleili@users.noreply.github.com> Date: Tue, 30 Nov 2021 13:51:58 +0800 Subject: [PATCH] Node schemaless/xiaolei/td 10842 nodejs support schemaless (#8597) * [TD-10842]:nodejs support schemaless * [TD-10842]: fix nodejs connector schemaless API error * [TD-10842]:nodejs support schemaless * fix require mistake * fix code style constant.js Co-authored-by: Huo Linhe --- src/connector/nodejs/nodetaos/cinterface.js | 49 +++++- src/connector/nodejs/nodetaos/constants.js | 140 ++++++++++-------- src/connector/nodejs/nodetaos/cursor.js | 20 ++- src/connector/nodejs/package.json | 3 +- src/connector/nodejs/tdengine.js | 11 +- .../nodejs/test/testSchemalessInsert.js | 84 +++++++++++ 6 files changed, 236 insertions(+), 71 deletions(-) create mode 100644 src/connector/nodejs/test/testSchemalessInsert.js diff --git a/src/connector/nodejs/nodetaos/cinterface.js b/src/connector/nodejs/nodetaos/cinterface.js index 3c395ec205..fa0eb20055 100644 --- a/src/connector/nodejs/nodetaos/cinterface.js +++ b/src/connector/nodejs/nodetaos/cinterface.js @@ -10,9 +10,8 @@ const ArrayType = require('ref-array-napi'); const Struct = require('ref-struct-napi'); const FieldTypes = require('./constants'); const errors = require('./error'); +const _ = require('lodash') const TaosObjects = require('./taosobjects'); -const { NULL_POINTER } = require('ref-napi'); -const { Console } = require('console'); module.exports = CTaosInterface; @@ -223,6 +222,8 @@ TaosField.fields.name.type.size = 65; TaosField.defineProperty('type', ref.types.char); TaosField.defineProperty('bytes', ref.types.short); +//define schemaless line array +var smlLine = ArrayType(ref.coerceType('char *')) /** * @@ -238,7 +239,6 @@ function CTaosInterface(config = null, pass = false) { ref.types.void_ptr2 = ref.refType(ref.types.void_ptr); /*Declare a bunch of functions first*/ /* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS * */ - if ('win32' == os.platform()) { taoslibname = 'taos'; } else { @@ -303,9 +303,15 @@ function CTaosInterface(config = null, pass = false) { // int64_t stime, void *param, void (*callback)(void *)); 'taos_open_stream': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr]], //void taos_close_stream(TAOS_STREAM *tstr); - 'taos_close_stream': [ref.types.void, [ref.types.void_ptr]] + 'taos_close_stream': [ref.types.void, [ref.types.void_ptr]], + + //Schemaless insert + //TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol,int precision) + // 'taos_schemaless_insert': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.int, ref.types.int, ref.types.int]] + 'taos_schemaless_insert': [ref.types.void_ptr, [ref.types.void_ptr, smlLine, 'int', 'int', 'int']] }); + if (pass == false) { if (config == null) { this._config = ref.alloc(ref.types.char_ptr, ref.NULL); @@ -664,3 +670,38 @@ CTaosInterface.prototype.closeStream = function closeStream(stream) { this.libtaos.taos_close_stream(stream); console.log("Closed stream"); } +//Schemaless insert API +/** + * TAOS* taos, char* lines[], int numLines, int protocol,int precision) + * using taos_errstr get error info, taos_errno get error code. Remmember + * to release taos_res, otherwile will lead memory leak. + * TAOS schemaless insert api + * @param {*} connection a valid database connection + * @param {*} lines string data, which statisfied with line proctocol + * @param {*} numLines number of rows in param lines. + * @param {*} protocal Line protocol, enum type (0,1,2,3),indicate different line protocol + * @param {*} precision timestamp precision in lines, enum type (0,1,2,3,4,5,6) + * @returns TAOS_RES + * + */ +CTaosInterface.prototype.schemalessInsert = function schemalessInsert(connection,lines, protocal, precision) { + let _numLines = null; + let _lines = null; + + if(_.isString(lines)){ + _numLines = 1; + _lines = Buffer.alloc(_numLines * ref.sizeof.pointer); + ref.set(_lines,0,ref.allocCString(lines),ref.types.char_ptr); + } + else if(_.isArray(lines)){ + _numLines = lines.length; + _lines = Buffer.alloc(_numLines * ref.sizeof.pointer); + for(let i = 0; i < _numLines ; i++){ + ref.set(_lines,i*ref.sizeof.pointer,ref.allocCString(lines[i]),ref.types.char_ptr) + } + } + else{ + throw new errors.InterfaceError("Unsupport lines input") + } + return this.libtaos.taos_schemaless_insert(connection, _lines, _numLines, protocal, precision); +} diff --git a/src/connector/nodejs/nodetaos/constants.js b/src/connector/nodejs/nodetaos/constants.js index 3a86631550..551cfce716 100644 --- a/src/connector/nodejs/nodetaos/constants.js +++ b/src/connector/nodejs/nodetaos/constants.js @@ -1,3 +1,45 @@ + +const SCHEMALESS_PROTOCOL = { + TSDB_SML_UNKNOWN_PROTOCOL: 0, + TSDB_SML_LINE_PROTOCOL: 1, + TSDB_SML_TELNET_PROTOCOL: 2, + TSDB_SML_JSON_PROTOCOL: 3 +} +const SCHEMALESS_PRECISION = { + TSDB_SML_TIMESTAMP_NOT_CONFIGURED : 0, + TSDB_SML_TIMESTAMP_HOURS : 1, + TSDB_SML_TIMESTAMP_MINUTES : 2, + TSDB_SML_TIMESTAMP_SECONDS : 3, + TSDB_SML_TIMESTAMP_MILLI_SECONDS : 4, + TSDB_SML_TIMESTAMP_MICRO_SECONDS : 5, + TSDB_SML_TIMESTAMP_NANO_SECONDS : 6 +} +const typeCodesToName = { + 0: 'Null', + 1: 'Boolean', + 2: 'Tiny Int', + 3: 'Small Int', + 4: 'Int', + 5: 'Big Int', + 6: 'Float', + 7: 'Double', + 8: 'Binary', + 9: 'Timestamp', + 10: 'Nchar', + 11: 'Tinyint Unsigned', + 12: 'Smallint Unsigned', + 13: 'Int Unsigned', + 14: 'Bigint Unsigned', +} + +/** + * @function + * @param {number} typecode - The code to get the name of the type for + * @return {string} Name of the field type + */ +function getType(typecode) { + return typeCodesToName[typecode]; +} /** * Contains the the definitions/values assigned to various field types * @module FieldTypes @@ -18,71 +60,45 @@ * @property {number} C_TIMESTAMP - Timestamp in format "YYYY:MM:DD HH:MM:SS.MMM". Measured in number of milliseconds passed after 1970-01-01 08:00:00.000 GMT. * @property {number} C_NCHAR - NChar field type encoded in ASCII, a wide string. - * - * - * + * * @property {number} C_TIMESTAMP_MILLI - The code for millisecond timestamps, as returned by libtaos.taos_result_precision(result). * @property {number} C_TIMESTAMP_MICRO - The code for microsecond timestamps, as returned by libtaos.taos_result_precision(result). */ module.exports = { - C_NULL : 0, - C_BOOL : 1, - C_TINYINT : 2, - C_SMALLINT : 3, - C_INT : 4, - C_BIGINT : 5, - C_FLOAT : 6, - C_DOUBLE : 7, - C_BINARY : 8, - C_TIMESTAMP : 9, - C_NCHAR : 10, - C_TINYINT_UNSIGNED : 11, - C_SMALLINT_UNSIGNED : 12, - C_INT_UNSIGNED : 13, - C_BIGINT_UNSIGNED : 14, - // NULL value definition - // NOTE: These values should change according to C definition in tsdb.h - C_BOOL_NULL : 2, - C_TINYINT_NULL : -128, - C_TINYINT_UNSIGNED_NULL : 255, - C_SMALLINT_NULL : -32768, - C_SMALLINT_UNSIGNED_NULL : 65535, - C_INT_NULL : -2147483648, - C_INT_UNSIGNED_NULL : 4294967295, - C_BIGINT_NULL : -9223372036854775808n, - C_BIGINT_UNSIGNED_NULL : 18446744073709551615n, - C_FLOAT_NULL : 2146435072, - C_DOUBLE_NULL : -9223370937343148032, - C_NCHAR_NULL : 4294967295, - C_BINARY_NULL : 255, - C_TIMESTAMP_MILLI : 0, - C_TIMESTAMP_MICRO : 1, - getType, -} - -const typeCodesToName = { - 0 : 'Null', - 1 : 'Boolean', - 2 : 'Tiny Int', - 3 : 'Small Int', - 4 : 'Int', - 5 : 'Big Int', - 6 : 'Float', - 7 : 'Double', - 8 : 'Binary', - 9 : 'Timestamp', - 10 : 'Nchar', - 11 : 'TINYINT_UNSIGNED', - 12 : 'SMALLINT_UNSIGNED', - 13 : 'INT_UNSIGNED', - 14 : 'BIGINT_UNSIGNED', + C_NULL: 0, + C_BOOL: 1, + C_TINYINT: 2, + C_SMALLINT: 3, + C_INT: 4, + C_BIGINT: 5, + C_FLOAT: 6, + C_DOUBLE: 7, + C_BINARY: 8, + C_TIMESTAMP: 9, + C_NCHAR: 10, + C_TINYINT_UNSIGNED: 11, + C_SMALLINT_UNSIGNED: 12, + C_INT_UNSIGNED: 13, + C_BIGINT_UNSIGNED: 14, + // NULL value definition + // NOTE: These values should change according to C definition in tsdb.h + C_BOOL_NULL: 2, + C_TINYINT_NULL: -128, + C_TINYINT_UNSIGNED_NULL: 255, + C_SMALLINT_NULL: -32768, + C_SMALLINT_UNSIGNED_NULL: 65535, + C_INT_NULL: -2147483648, + C_INT_UNSIGNED_NULL: 4294967295, + C_BIGINT_NULL: -9223372036854775808n, + C_BIGINT_UNSIGNED_NULL: 18446744073709551615n, + C_FLOAT_NULL: 2146435072, + C_DOUBLE_NULL: -9223370937343148032, + C_NCHAR_NULL: 4294967295, + C_BINARY_NULL: 255, + C_TIMESTAMP_MILLI: 0, + C_TIMESTAMP_MICRO: 1, + getType, + SCHEMALESS_PROTOCOL, + SCHEMALESS_PRECISION } -/** - * @function - * @param {number} typecode - The code to get the name of the type for - * @return {string} Name of the field type - */ -function getType(typecode) { - return typeCodesToName[typecode]; -} diff --git a/src/connector/nodejs/nodetaos/cursor.js b/src/connector/nodejs/nodetaos/cursor.js index f879d89d48..3c01dc51b4 100644 --- a/src/connector/nodejs/nodetaos/cursor.js +++ b/src/connector/nodejs/nodetaos/cursor.js @@ -211,7 +211,7 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) { } } - + performance.mark('B'); performance.measure('query', 'A', 'B'); let response = this._createSetResponse(this._rowcount, time) @@ -474,3 +474,21 @@ TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = TDengineCursor.prototype.closeStream = function closeStream(stream) { this._chandle.closeStream(stream); } +/** + * schemaless insert + * @param {*} connection a valid database connection + * @param {*} lines string data, which statisfied with line proctocol + * @param {*} protocal Line protocol, enum type (0,1,2,3),indicate different line protocol + * @param {*} precision timestamp precision in lines, enum type (0,1,2,3,4,5,6) + * @returns TAOS_RES + * + */ +TDengineCursor.prototype.schemalessInsert = function schemalessInsert(lines, protocol, precision) { + this._result = this._chandle.schemalessInsert(this._connection._conn, lines, protocol, precision); + let errorNo = this._chandle.errno(this._result); + if (errorNo != 0) { + throw new errors.InterfaceError(errorNo + ":" + this._chandle.errStr(this._result)); + this._chandle.freeResult(this._result); + } + this._chandle.freeResult(this._result); +} diff --git a/src/connector/nodejs/package.json b/src/connector/nodejs/package.json index 711db94b84..d7eba48a46 100644 --- a/src/connector/nodejs/package.json +++ b/src/connector/nodejs/package.json @@ -7,7 +7,7 @@ "test": "test" }, "scripts": { - "test": "node test/test.js && node test/testMicroseconds.js && node test/testNanoseconds.js && node test/testUnsignedType.js " + "test": "node test/test.js && node test/testMicroseconds.js && node test/testNanoseconds.js && node test/testUnsignedType.js && node test/testSchemalessInsert.js " }, "repository": { "type": "git", @@ -27,6 +27,7 @@ "homepage": "https://github.com/taosdata/tdengine#readme", "dependencies": { "ffi-napi": "^3.1.0", + "lodash": "^4.17.21", "ref-array-napi": "^1.2.1", "ref-napi": "^1.5.2", "ref-struct-napi": "^1.1.1" diff --git a/src/connector/nodejs/tdengine.js b/src/connector/nodejs/tdengine.js index 047c744a4f..ccc66b2c09 100644 --- a/src/connector/nodejs/tdengine.js +++ b/src/connector/nodejs/tdengine.js @@ -1,4 +1,9 @@ var TDengineConnection = require('./nodetaos/connection.js') -module.exports.connect = function (connection={}) { - return new TDengineConnection(connection); -} +const TDengineConstant = require('./nodetaos/constants.js') +module.exports = { + connect: function (connection = {}) { + return new TDengineConnection(connection); + }, + SCHEMALESS_PROTOCOL: TDengineConstant.SCHEMALESS_PROTOCOL, + SCHEMALESS_PRECISION: TDengineConstant.SCHEMALESS_PRECISION, +} \ No newline at end of file diff --git a/src/connector/nodejs/test/testSchemalessInsert.js b/src/connector/nodejs/test/testSchemalessInsert.js new file mode 100644 index 0000000000..16998425ec --- /dev/null +++ b/src/connector/nodejs/test/testSchemalessInsert.js @@ -0,0 +1,84 @@ +const _ = require('lodash'); +const taos = require('../tdengine'); + +var conn = taos.connect({ host: "127.0.0.1", user: "root", password: "taosdata", config: "/etc/taos", port: 10 }); +var c1 = conn.cursor(); +executeUpdate("drop database if exists nodedb;"); +executeUpdate("create database if not exists nodedb ;"); +executeUpdate("use nodedb;"); + +let tbname1 = "line_protocol_arr"; +let tbname2 = "json_protocol_arr"; +let tbname3 = "json_protocol_str"; +let tbname4 = "line_protocol_str"; + + +let line1 = [tbname1 + ",t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000", +tbname1 + ",t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833641000000" +]; +let line2 = ["{" + + "\"metric\": \"" + tbname2 + "\"," + + "\"timestamp\": 1626006833," + + "\"value\": 10," + + "\"tags\": {" + + " \"t1\": true," + + "\"t2\": false," + + "\"t3\": 10," + + "\"t4\": \"123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>\"" + + "}" + + "}" +]; + +let line3 = "{" + + "\"metric\": \"" + tbname3 + "\"," + + "\"timestamp\": 1626006833000," + + "\"value\": 10," + + "\"tags\": {" + + " \"t1\": true," + + "\"t2\": false," + + "\"t3\": 10," + + "\"t4\": \"123_abc_.!@#$%^&*:;,./?|+-=()[]{}<>\"" + + "}" + + "}"; + +let line4 = tbname4 + ",t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639"; + + +try { + + c1.schemalessInsert(line1, taos.SCHEMALESS_PROTOCOL.TSDB_SML_LINE_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_NANO_SECONDS); + testSchemaless(tbname1, line1.length); + + c1.schemalessInsert(line2, taos.SCHEMALESS_PROTOCOL.TSDB_SML_JSON_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_SECONDS); + testSchemaless(tbname2, line2.length); + + c1.schemalessInsert(line3, taos.SCHEMALESS_PROTOCOL.TSDB_SML_JSON_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_MILLI_SECONDS); + testSchemaless(tbname3, 1); + + c1.schemalessInsert(line4, taos.SCHEMALESS_PROTOCOL.TSDB_SML_LINE_PROTOCOL, taos.SCHEMALESS_PRECISION.TSDB_SML_TIMESTAMP_MILLI_SECONDS); + testSchemaless(tbname4, 1); + +} catch (err) { + console.log(err) +} +function executeUpdate(sql) { + console.log(sql); + c1.execute(sql); +} + +function testSchemaless(tbname, numLines) { + let sql = "select count(*) from " + tbname + ";"; + executeUpdate(sql); + let affectRows = _.first(c1.fetchall()); + if (affectRows != numLines) { + console.log(1); + console.log(line2); + throw "protocol " + tbname + " schemaless insert success,but can't select as expect." + } + else { + console.log("protocol " + tbname + " schemaless insert success, can select as expect.") + } + console.log("===================") +} + +setTimeout(() => conn.close(), 2000); -- GitLab