From 1741d9abebec4c8c45c7be2153cfd30a356627e4 Mon Sep 17 00:00:00 2001 From: liu0x54 Date: Tue, 28 Jul 2020 14:08:05 +0000 Subject: [PATCH] [TD-1001] adapt nodejs connector to TD 2.0 --- src/connector/nodejs/nodetaos/cinterface.js | 140 +++++++++++++------- src/connector/nodejs/nodetaos/connection.js | 4 +- src/connector/nodejs/nodetaos/cursor.js | 30 +++-- src/connector/nodejs/package-lock.json | 4 +- src/connector/nodejs/package.json | 4 +- src/connector/nodejs/test/test.js | 13 +- 6 files changed, 125 insertions(+), 70 deletions(-) diff --git a/src/connector/nodejs/nodetaos/cinterface.js b/src/connector/nodejs/nodetaos/cinterface.js index d076beb8c0..15502205f4 100644 --- a/src/connector/nodejs/nodetaos/cinterface.js +++ b/src/connector/nodejs/nodetaos/cinterface.js @@ -10,6 +10,7 @@ const Struct = require('ref-struct'); const FieldTypes = require('./constants'); const errors = require ('./error'); const TaosObjects = require('./taosobjects'); +const { NULL_POINTER } = require('ref'); module.exports = CTaosInterface; @@ -25,7 +26,7 @@ function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro=false if (micro == true) { timestampConverter = convertMicrosecondsToDatetime; } - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = []; let currOffset = 0; while (currOffset < data.length) { @@ -43,7 +44,7 @@ function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro=false return res; } function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = new Array(data.length); for (let i = 0; i < data.length; i++) { if (data[i] == 0) { @@ -59,7 +60,7 @@ function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { return res; } function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = []; let currOffset = 0; while (currOffset < data.length) { @@ -70,7 +71,7 @@ function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) return res; } function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = []; let currOffset = 0; while (currOffset < data.length) { @@ -81,7 +82,7 @@ function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) return res; } function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = []; let currOffset = 0; while (currOffset < data.length) { @@ -92,18 +93,18 @@ function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { return res; } function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = []; let currOffset = 0; while (currOffset < data.length) { let d = data.readInt64LE(currOffset); - res.push(d == FieldTypes.C_BIGINT_NULL ? null : BigInt(d)); + res.push(d == FieldTypes.C_BIGINT_NULL ? null : d); currOffset += nbytes; } return res; } function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = []; let currOffset = 0; while (currOffset < data.length) { @@ -114,7 +115,7 @@ function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { return res; } function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = []; let currOffset = 0; while (currOffset < data.length) { @@ -125,7 +126,7 @@ function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { return res; } function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = []; let currOffset = 0; while (currOffset < data.length) { @@ -141,7 +142,7 @@ function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { return res; } function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { - data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); + data = ref.reinterpret(data, nbytes * num_of_rows, offset); let res = []; let currOffset = 0; // every 4 bytes, a character is encoded; @@ -177,9 +178,10 @@ var char_arr = ArrayType(ref.types.char); var TaosField = Struct({ 'name': char_arr, }); -TaosField.fields.name.type.size = 64; -TaosField.defineProperty('bytes', ref.types.short); +TaosField.fields.name.type.size = 65; TaosField.defineProperty('type', ref.types.char); +TaosField.defineProperty('bytes', ref.types.short); + /** * @@ -202,14 +204,14 @@ function CTaosInterface (config = null, pass = false) { 'taos_connect': [ ref.types.void_ptr, [ ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int ] ], //void taos_close(TAOS *taos) 'taos_close': [ ref.types.void, [ ref.types.void_ptr ] ], - //TAOS_RES *taos_use_result(TAOS *taos); - 'taos_use_result': [ ref.types.void_ptr, [ ref.types.void_ptr ] ], + //int *taos_fetch_lengths(TAOS_RES *taos); + 'taos_fetch_lengths': [ ref.types.void_ptr, [ ref.types.void_ptr ] ], //int taos_query(TAOS *taos, char *sqlstr) - 'taos_query': [ ref.types.int, [ ref.types.void_ptr, ref.types.char_ptr ] ], + 'taos_query': [ ref.types.void_ptr, [ ref.types.void_ptr, ref.types.char_ptr ] ], //int taos_affected_rows(TAOS *taos) 'taos_affected_rows': [ ref.types.int, [ ref.types.void_ptr] ], //int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) - 'taos_fetch_block': [ ref.types.int, [ ref.types.void_ptr, ref.types.void_ptr] ], + 'taos_fetch_block': [ ref.types.int, [ ref.types.void_ptr, ref.types.void_ptr2] ], //int taos_num_fields(TAOS_RES *res); 'taos_num_fields': [ ref.types.int, [ ref.types.void_ptr] ], //TAOS_ROW taos_fetch_row(TAOS_RES *res) @@ -329,44 +331,66 @@ CTaosInterface.prototype.query = function query(connection, sql) { CTaosInterface.prototype.affectedRows = function affectedRows(connection) { return this.libtaos.taos_affected_rows(connection); } -CTaosInterface.prototype.useResult = function useResult(connection) { - let result = this.libtaos.taos_use_result(connection); +CTaosInterface.prototype.useResult = function useResult(result) { + let fields = []; let pfields = this.fetchFields(result); if (ref.isNull(pfields) == false) { - pfields = ref.reinterpret(pfields, this.fieldsCount(connection) * 68, 0); + pfields = ref.reinterpret(pfields, this.fieldsCount(result) * 68, 0); for (let i = 0; i < pfields.length; i += 68) { //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type fields.push( { - name: ref.readCString(ref.reinterpret(pfields,64,i)), - bytes: pfields[i + 64], - type: pfields[i + 66] + name: ref.readCString(ref.reinterpret(pfields,65,i)), + type: pfields[i + 65], + bytes: pfields[i + 66] }) } } - return {result:result, fields:fields} + return {fields:fields} } CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) { - let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data - let num_of_rows = this.libtaos.taos_fetch_block(result, pblock) - if (num_of_rows == 0) { + let pblock = ref.ref(ref.NULL); // equal to our raw data + pblock = this.libtaos.taos_fetch_row(result); + + if (pblock == 0) { return {block:null, num_of_rows:0}; } let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO) - let blocks = new Array(fields.length); - blocks.fill(null); - num_of_rows = Math.abs(num_of_rows); + + + //num_of_rows = Math.abs(num_of_rows); let offset = 0; - pblock = pblock.deref(); - for (let i = 0; i < fields.length; i++) { + //pblock = pblock.deref(); + + var fieldL = this.libtaos.taos_fetch_lengths(result); + var numoffields = this.libtaos.taos_field_count(result); - if (!convertFunctions[fields[i]['type']] ) { + let blocks = new Array(numoffields); + blocks.fill(null); + var fieldlens = []; + + if (ref.isNull(fieldL) == false) { + + for (let i = 0; i < numoffields; i ++) { + let plen = ref.reinterpret(fieldL, 4, i*4); + //plen = ref.readPointer(plen,0,ref.types.int); + let len = plen.readInt32LE(0); + fieldlens.push(len); + //console.log(len); + } + } + for (let i = 0; i < numoffields; i++) { + if (!convertFunctions[fields['fields'][i]['type']] ) { throw new errors.DatabaseError("Invalid data type returned from database"); } - blocks[i] = convertFunctions[fields[i]['type']](pblock, num_of_rows, fields[i]['bytes'], offset, isMicro); - offset += fields[i]['bytes'] * num_of_rows; + prow = ref.reinterpret(pblock,8,i*8); + console.log(fieldlens[i]); + blocks[i] = convertFunctions[fields['fields'][i]['type']](prow, 1, fieldlens[i], 0, isMicro); + console.log('******************************'); + console.log(blocks[i]); + //offset += fields[i]['bytes'] * num_of_rows; } - return {blocks: blocks, num_of_rows:Math.abs(num_of_rows)} + return {blocks: blocks, num_of_rows:1} } CTaosInterface.prototype.fetchRow = function fetchRow(result, fields) { let row = this.libtaos.taos_fetch_row(result); @@ -381,17 +405,17 @@ CTaosInterface.prototype.numFields = function numFields(result) { return this.libtaos.taos_num_fields(result); } // Fetch fields count by connection, the latest query -CTaosInterface.prototype.fieldsCount = function fieldsCount(connection) { - return this.libtaos.taos_field_count(connection); +CTaosInterface.prototype.fieldsCount = function fieldsCount(result) { + return this.libtaos.taos_field_count(result); } CTaosInterface.prototype.fetchFields = function fetchFields(result) { return this.libtaos.taos_fetch_fields(result); } -CTaosInterface.prototype.errno = function errno(connection) { - return this.libtaos.taos_errno(connection); +CTaosInterface.prototype.errno = function errno(result) { + return this.libtaos.taos_errno(result); } -CTaosInterface.prototype.errStr = function errStr(connection) { - return ref.readCString(this.libtaos.taos_errstr(connection)); +CTaosInterface.prototype.errStr = function errStr(result) { + return ref.readCString(this.libtaos.taos_errstr(result)); } // Async CTaosInterface.prototype.query_a = function query_a(connection, sql, callback, param = ref.ref(ref.NULL)) { @@ -411,19 +435,39 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback, let asyncCallbackWrapper = function (param2, result2, numOfRows2) { // Data preparation to pass to cursor. Could be bottleneck in query execution callback times. let row = cti.libtaos.taos_fetch_row(result2); + console.log(row); let fields = cti.fetchFields_a(result2); + + let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO); let blocks = new Array(fields.length); blocks.fill(null); numOfRows2 = Math.abs(numOfRows2); let offset = 0; + var fieldL = cti.libtaos.taos_fetch_lengths(result); + var fieldlens = []; + if (ref.isNull(fieldL) == false) { + + for (let i = 0; i < fields.length; i ++) { + let plen = ref.reinterpret(fieldL, 8, i*8); + let len = ref.get(plen,0,ref.types.int32); + fieldlens.push(len); + console.log('11111111111111111111'); + console.log(fields.length); + console.log(len); + } + } + if (numOfRows2 > 0){ for (let i = 0; i < fields.length; i++) { if (!convertFunctions[fields[i]['type']] ) { throw new errors.DatabaseError("Invalid data type returned from database"); } - blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro); - offset += fields[i]['bytes'] * numOfRows2; + let prow = ref.reinterpret(row,8,i*8); + //blocks[i] = convertFunctions[fields[i]['type']](ref.get(prow,0,ref.types.void_ptr), numOfRows2, fieldlens[i], 0, isMicro); + console.log(prow); + blocks[i] = convertFunctions[fields[i]['type']](ref.readPointer(prow), numOfRows2, fieldlens[i], 0, isMicro); + //offset += fields[i]['bytes'] * numOfRows2; } } callback(param2, result2, numOfRows2, blocks); @@ -440,11 +484,11 @@ CTaosInterface.prototype.fetchFields_a = function fetchFields_a (result) { if (ref.isNull(pfields) == false) { pfields = ref.reinterpret(pfields, 68 * pfieldscount , 0); for (let i = 0; i < pfields.length; i += 68) { - //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type + //0 - 64 = name //65 = type, 66 - 67 = bytes fields.push( { - name: ref.readCString(ref.reinterpret(pfields,64,i)), - bytes: pfields[i + 64], - type: pfields[i + 66] + name: ref.readCString(ref.reinterpret(pfields,65,i)), + type: pfields[i + 65], + bytes: pfields[i + 66] }) } } diff --git a/src/connector/nodejs/nodetaos/connection.js b/src/connector/nodejs/nodetaos/connection.js index bb7651544a..08186f8705 100644 --- a/src/connector/nodejs/nodetaos/connection.js +++ b/src/connector/nodejs/nodetaos/connection.js @@ -74,9 +74,11 @@ TDengineConnection.prototype.rollback = function rollback() { * Clear the results from connector * @private */ -TDengineConnection.prototype._clearResultSet = function _clearResultSet() { +/* + TDengineConnection.prototype._clearResultSet = function _clearResultSet() { var result = this._chandle.useResult(this._conn).result; if (result) { this._chandle.freeResult(result) } } +*/ diff --git a/src/connector/nodejs/nodetaos/cursor.js b/src/connector/nodejs/nodetaos/cursor.js index acfe96dfbc..3f01b2b518 100644 --- a/src/connector/nodejs/nodetaos/cursor.js +++ b/src/connector/nodejs/nodetaos/cursor.js @@ -98,7 +98,7 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback if (this._connection == null) { throw new errors.ProgrammingError('Cursor is not connected'); } - this._connection._clearResultSet(); + this._reset_result(); let stmt = operation; @@ -111,18 +111,18 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback }); obs.observe({ entryTypes: ['measure'] }); performance.mark('A'); - res = this._chandle.query(this._connection._conn, stmt); + this._result = this._chandle.query(this._connection._conn, stmt); performance.mark('B'); performance.measure('query', 'A', 'B'); } else { - res = this._chandle.query(this._connection._conn, stmt); + this._result = this._chandle.query(this._connection._conn, stmt); } - + res = this._chandle.errno(this._result); if (res == 0) { - let fieldCount = this._chandle.fieldsCount(this._connection._conn); + let fieldCount = this._chandle.fieldsCount(this._result); if (fieldCount == 0) { - let affectedRowCount = this._chandle.affectedRows(this._connection._conn); + let affectedRowCount = this._chandle.affectedRows(this._result); let response = this._createAffectedResponse(affectedRowCount, time) if (options['quiet'] != true) { console.log(response); @@ -131,16 +131,17 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback return affectedRowCount; //return num of affected rows, common with insert, use statements } else { - let resAndField = this._chandle.useResult(this._connection._conn, fieldCount) - this._result = resAndField.result; - this._fields = resAndField.fields; - this.fields = resAndField.fields; + this._fields = this._chandle.useResult(this._result); + this.fields = this._fields; + console.log('++++++++++++++++++++++++++'); + console.log(this._result); wrapCB(callback); + return this._result; //return a pointer to the result } } else { - throw new errors.ProgrammingError(this._chandle.errStr(this._connection._conn)) + throw new errors.ProgrammingError(this._chandle.errStr(this._result)) } } @@ -195,6 +196,8 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) { */ obs.observe({ entryTypes: ['measure'] }); performance.mark('A'); + console.log('fetchall ----------------'); + while(true) { let blockAndRows = this._chandle.fetchBlock(this._result, this._fields); @@ -221,7 +224,7 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) { let response = this._createSetResponse(this._rowcount, time) console.log(response); - this._connection._clearResultSet(); + // this._connection._clearResultSet(); let fields = this.fields; this._reset_result(); this.data = data; @@ -381,6 +384,9 @@ TDengineCursor.prototype.stopQuery = function stopQuery(result) { } TDengineCursor.prototype._reset_result = function _reset_result() { this._rowcount = -1; + if (this._result != null) { + this._chandle.freeResult(this._result); + } this._result = null; this._fields = null; this.data = []; diff --git a/src/connector/nodejs/package-lock.json b/src/connector/nodejs/package-lock.json index 1137e35106..5b10e7381b 100644 --- a/src/connector/nodejs/package-lock.json +++ b/src/connector/nodejs/package-lock.json @@ -1,6 +1,6 @@ { - "name": "td-connector", - "version": "1.6.1", + "name": "td2.0-connector", + "version": "0.0.1", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/src/connector/nodejs/package.json b/src/connector/nodejs/package.json index 2bc4a2453d..f5560742b2 100644 --- a/src/connector/nodejs/package.json +++ b/src/connector/nodejs/package.json @@ -1,6 +1,6 @@ { - "name": "td-connector", - "version": "2.0.0", + "name": "td2.0-connector", + "version": "0.0.1", "description": "A Node.js connector for TDengine.", "main": "tdengine.js", "scripts": { diff --git a/src/connector/nodejs/test/test.js b/src/connector/nodejs/test/test.js index 5d96e798d8..b8a8bcd151 100644 --- a/src/connector/nodejs/test/test.js +++ b/src/connector/nodejs/test/test.js @@ -19,7 +19,7 @@ function randomBool() { } // Initialize - +//c1.execute('drop database td_connector_test;'); c1.execute('create database if not exists td_connector_test;'); c1.execute('use td_connector_test;') c1.execute('create table if not exists all_types (ts timestamp, _int int, _bigint bigint, _float float, _double double, _binary binary(40), _smallint smallint, _tinyint tinyint, _bool bool, _nchar nchar(40));'); @@ -28,7 +28,7 @@ c1.execute('create table if not exists stabletest (ts timestamp, v1 int, v2 int, // Shell Test : The following uses the cursor to imitate the taos shell // Insert -for (let i = 0; i < 10000; i++) { +for (let i = 0; i < 100; i++) { let insertData = ["now+" + i + "s", // Timestamp parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // Int parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // BigInt @@ -40,18 +40,20 @@ for (let i = 0; i < 10000; i++) { randomBool(), "\"Nchars\""]; // Bool c1.execute('insert into td_connector_test.all_types values(' + insertData.join(',') + ' );', {quiet:true}); - if (i % 1000 == 0) { + if (i % 10 == 0) { console.log("Insert # " , i); } } // Select -c1.execute('select * from td_connector_test.all_types limit 10 offset 1000;'); +console.log('select * from td_connector_test.all_types limit 3 offset 100;'); +c1.execute('select * from td_connector_test.all_types limit 1 offset 100;'); var d = c1.fetchall(); console.log(c1.fields); console.log(d); - +/* // Functions +console.log('select count(*), avg(_int), sum(_float), max(_bigint), min(_double) from td_connector_test.all_types;') c1.execute('select count(*), avg(_int), sum(_float), max(_bigint), min(_double) from td_connector_test.all_types;'); var d = c1.fetchall(); console.log(c1.fields); @@ -134,3 +136,4 @@ setTimeout(function(){ c1.query('drop database td_connector_test;'); },2000); conn.close(); +*/ \ No newline at end of file -- GitLab