未验证 提交 f0b77fa3 编写于 作者: H Huo Linhe 提交者: GitHub

[TD-4258]<fix>: speedup nodejs fetchall() with taos_fetch_block (#6165)

上级 4697cdfd
......@@ -9,7 +9,7 @@ const ffi = require('ffi-napi');
const ArrayType = require('ref-array-napi');
const Struct = require('ref-struct-napi');
const FieldTypes = require('./constants');
const errors = require ('./error');
const errors = require('./error');
const TaosObjects = require('./taosobjects');
const { NULL_POINTER } = require('ref-napi');
......@@ -22,7 +22,7 @@ function convertMicrosecondsToDatetime(time) {
return new TaosObjects.TaosTimestamp(time * 0.001, true);
}
function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
timestampConverter = convertMillisecondsToDatetime;
if (micro == true) {
timestampConverter = convertMicrosecondsToDatetime;
......@@ -44,14 +44,14 @@ function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro=false
}
return res;
}
function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = new Array(data.length);
for (let i = 0; i < data.length; i++) {
if (data[i] == 0) {
res[i] = false;
}
else if (data[i] == 1){
else if (data[i] == 1) {
res[i] = true;
}
else if (data[i] == FieldTypes.C_BOOL_NULL) {
......@@ -60,29 +60,29 @@ function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
}
return res;
}
function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
let d = data.readIntLE(currOffset,1);
let d = data.readIntLE(currOffset, 1);
res.push(d == FieldTypes.C_TINYINT_NULL ? null : d);
currOffset += nbytes;
}
return res;
}
function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
let d = data.readIntLE(currOffset,2);
let d = data.readIntLE(currOffset, 2);
res.push(d == FieldTypes.C_SMALLINT_NULL ? null : d);
currOffset += nbytes;
}
return res;
}
function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
......@@ -93,7 +93,7 @@ function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
}
return res;
}
function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
......@@ -104,7 +104,7 @@ function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
}
return res;
}
function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
......@@ -115,7 +115,7 @@ function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
}
return res;
}
function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
......@@ -126,7 +126,7 @@ function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
}
return res;
}
function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
......@@ -142,7 +142,7 @@ function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
}
return res;
}
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let dataEntry = data.slice(0, nbytes); //one entry in a row under a column;
......@@ -153,23 +153,23 @@ function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
// Object with all the relevant converters from pblock data to javascript readable data
let convertFunctions = {
[FieldTypes.C_BOOL] : convertBool,
[FieldTypes.C_TINYINT] : convertTinyint,
[FieldTypes.C_SMALLINT] : convertSmallint,
[FieldTypes.C_INT] : convertInt,
[FieldTypes.C_BIGINT] : convertBigint,
[FieldTypes.C_FLOAT] : convertFloat,
[FieldTypes.C_DOUBLE] : convertDouble,
[FieldTypes.C_BINARY] : convertBinary,
[FieldTypes.C_TIMESTAMP] : convertTimestamp,
[FieldTypes.C_NCHAR] : convertNchar
[FieldTypes.C_BOOL]: convertBool,
[FieldTypes.C_TINYINT]: convertTinyint,
[FieldTypes.C_SMALLINT]: convertSmallint,
[FieldTypes.C_INT]: convertInt,
[FieldTypes.C_BIGINT]: convertBigint,
[FieldTypes.C_FLOAT]: convertFloat,
[FieldTypes.C_DOUBLE]: convertDouble,
[FieldTypes.C_BINARY]: convertBinary,
[FieldTypes.C_TIMESTAMP]: convertTimestamp,
[FieldTypes.C_NCHAR]: convertNchar
}
// Define TaosField structure
var char_arr = ArrayType(ref.types.char);
var TaosField = Struct({
'name': char_arr,
});
});
TaosField.fields.name.type.size = 65;
TaosField.defineProperty('type', ref.types.char);
TaosField.defineProperty('bytes', ref.types.short);
......@@ -183,7 +183,7 @@ TaosField.defineProperty('bytes', ref.types.short);
* @classdesc The CTaosInterface is the interface through which Node.JS communicates data back and forth with TDengine. It is not advised to
* access this class directly and use it unless you understand what these functions do.
*/
function CTaosInterface (config = null, pass = false) {
function CTaosInterface(config = null, pass = false) {
ref.types.char_ptr = ref.refType(ref.types.char);
ref.types.void_ptr = ref.refType(ref.types.void);
ref.types.void_ptr2 = ref.refType(ref.types.void_ptr);
......@@ -196,64 +196,65 @@ function CTaosInterface (config = null, pass = false) {
taoslibname = 'libtaos';
}
this.libtaos = ffi.Library(taoslibname, {
'taos_options': [ ref.types.int, [ ref.types.int , ref.types.void_ptr ] ],
'taos_init': [ ref.types.void, [ ] ],
'taos_options': [ref.types.int, [ref.types.int, ref.types.void_ptr]],
'taos_init': [ref.types.void, []],
//TAOS *taos_connect(char *ip, char *user, char *pass, char *db, int port)
'taos_connect': [ ref.types.void_ptr, [ ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int ] ],
'taos_connect': [ref.types.void_ptr, [ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int]],
//void taos_close(TAOS *taos)
'taos_close': [ ref.types.void, [ ref.types.void_ptr ] ],
//int *taos_fetch_lengths(TAOS_RES *taos);
'taos_fetch_lengths': [ ref.types.void_ptr, [ ref.types.void_ptr ] ],
'taos_close': [ref.types.void, [ref.types.void_ptr]],
//int *taos_fetch_lengths(TAOS_RES *res);
'taos_fetch_lengths': [ref.types.void_ptr, [ref.types.void_ptr]],
//int taos_query(TAOS *taos, char *sqlstr)
'taos_query': [ ref.types.void_ptr, [ ref.types.void_ptr, ref.types.char_ptr ] ],
//int taos_affected_rows(TAOS *taos)
'taos_affected_rows': [ ref.types.int, [ ref.types.void_ptr] ],
'taos_query': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr]],
//int taos_affected_rows(TAOS_RES *res)
'taos_affected_rows': [ref.types.int, [ref.types.void_ptr]],
//int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)
'taos_fetch_block': [ ref.types.int, [ ref.types.void_ptr, ref.types.void_ptr] ],
'taos_fetch_block': [ref.types.int, [ref.types.void_ptr, ref.types.void_ptr]],
//int taos_num_fields(TAOS_RES *res);
'taos_num_fields': [ ref.types.int, [ ref.types.void_ptr] ],
'taos_num_fields': [ref.types.int, [ref.types.void_ptr]],
//TAOS_ROW taos_fetch_row(TAOS_RES *res)
//TAOS_ROW is void **, but we set the return type as a reference instead to get the row
'taos_fetch_row': [ ref.refType(ref.types.void_ptr2), [ ref.types.void_ptr ] ],
'taos_fetch_row': [ref.refType(ref.types.void_ptr2), [ref.types.void_ptr]],
'taos_print_row': [ref.types.int, [ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
//int taos_result_precision(TAOS_RES *res)
'taos_result_precision': [ ref.types.int, [ ref.types.void_ptr ] ],
'taos_result_precision': [ref.types.int, [ref.types.void_ptr]],
//void taos_free_result(TAOS_RES *res)
'taos_free_result': [ ref.types.void, [ ref.types.void_ptr] ],
'taos_free_result': [ref.types.void, [ref.types.void_ptr]],
//int taos_field_count(TAOS *taos)
'taos_field_count': [ ref.types.int, [ ref.types.void_ptr ] ],
'taos_field_count': [ref.types.int, [ref.types.void_ptr]],
//TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)
'taos_fetch_fields': [ ref.refType(TaosField), [ ref.types.void_ptr ] ],
'taos_fetch_fields': [ref.refType(TaosField), [ref.types.void_ptr]],
//int taos_errno(TAOS *taos)
'taos_errno': [ ref.types.int, [ ref.types.void_ptr] ],
'taos_errno': [ref.types.int, [ref.types.void_ptr]],
//char *taos_errstr(TAOS *taos)
'taos_errstr': [ ref.types.char_ptr, [ ref.types.void_ptr] ],
'taos_errstr': [ref.types.char_ptr, [ref.types.void_ptr]],
//void taos_stop_query(TAOS_RES *res);
'taos_stop_query': [ ref.types.void, [ ref.types.void_ptr] ],
'taos_stop_query': [ref.types.void, [ref.types.void_ptr]],
//char *taos_get_server_info(TAOS *taos);
'taos_get_server_info': [ ref.types.char_ptr, [ ref.types.void_ptr ] ],
'taos_get_server_info': [ref.types.char_ptr, [ref.types.void_ptr]],
//char *taos_get_client_info();
'taos_get_client_info': [ ref.types.char_ptr, [ ] ],
'taos_get_client_info': [ref.types.char_ptr, []],
// ASYNC
// void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param)
'taos_query_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr ] ],
'taos_query_a': [ref.types.void, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr]],
// void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
'taos_fetch_rows_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr ]],
'taos_fetch_rows_a': [ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr]],
// Subscription
//TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)
'taos_subscribe': [ ref.types.void_ptr, [ ref.types.void_ptr, ref.types.int, ref.types.char_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int] ],
'taos_subscribe': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.int, ref.types.char_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
// TAOS_RES *taos_consume(TAOS_SUB *tsub)
'taos_consume': [ ref.types.void_ptr, [ref.types.void_ptr] ],
'taos_consume': [ref.types.void_ptr, [ref.types.void_ptr]],
//void taos_unsubscribe(TAOS_SUB *tsub);
'taos_unsubscribe': [ ref.types.void, [ ref.types.void_ptr ] ],
'taos_unsubscribe': [ref.types.void, [ref.types.void_ptr]],
// Continuous Query
//TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
// int64_t stime, void *param, void (*callback)(void *));
'taos_open_stream': [ ref.types.void_ptr, [ ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr ] ],
'taos_open_stream': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr]],
//void taos_close_stream(TAOS_STREAM *tstr);
'taos_close_stream': [ ref.types.void, [ ref.types.void_ptr ] ]
'taos_close_stream': [ref.types.void, [ref.types.void_ptr]]
});
if (pass == false) {
......@@ -264,7 +265,7 @@ function CTaosInterface (config = null, pass = false) {
try {
this._config = ref.allocCString(config);
}
catch(err){
catch (err) {
throw "Attribute Error: config is expected as a str";
}
}
......@@ -277,37 +278,37 @@ function CTaosInterface (config = null, pass = false) {
}
CTaosInterface.prototype.config = function config() {
return this._config;
}
CTaosInterface.prototype.connect = function connect(host=null, user="root", password="taosdata", db=null, port=0) {
let _host,_user,_password,_db,_port;
}
CTaosInterface.prototype.connect = function connect(host = null, user = "root", password = "taosdata", db = null, port = 0) {
let _host, _user, _password, _db, _port;
try {
_host = host != null ? ref.allocCString(host) : ref.alloc(ref.types.char_ptr, ref.NULL);
}
catch(err) {
catch (err) {
throw "Attribute Error: host is expected as a str";
}
try {
_user = ref.allocCString(user)
}
catch(err) {
catch (err) {
throw "Attribute Error: user is expected as a str";
}
try {
_password = ref.allocCString(password);
}
catch(err) {
catch (err) {
throw "Attribute Error: password is expected as a str";
}
try {
_db = db != null ? ref.allocCString(db) : ref.alloc(ref.types.char_ptr, ref.NULL);
}
catch(err) {
catch (err) {
throw "Attribute Error: db is expected as a str";
}
try {
_port = ref.alloc(ref.types.int, port);
}
catch(err) {
catch (err) {
throw TypeError("port is expected as an int")
}
let connection = this.libtaos.taos_connect(_host, _user, _password, _db, _port);
......@@ -326,8 +327,8 @@ CTaosInterface.prototype.close = function close(connection) {
CTaosInterface.prototype.query = function query(connection, sql) {
return this.libtaos.taos_query(connection, ref.allocCString(sql));
}
CTaosInterface.prototype.affectedRows = function affectedRows(connection) {
return this.libtaos.taos_affected_rows(connection);
CTaosInterface.prototype.affectedRows = function affectedRows(result) {
return this.libtaos.taos_affected_rows(result);
}
CTaosInterface.prototype.useResult = function useResult(result) {
......@@ -337,8 +338,8 @@ CTaosInterface.prototype.useResult = function useResult(result) {
pfields = ref.reinterpret(pfields, this.fieldsCount(result) * 68, 0);
for (let i = 0; i < pfields.length; i += 68) {
//0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
fields.push( {
name: ref.readCString(ref.reinterpret(pfields,65,i)),
fields.push({
name: ref.readCString(ref.reinterpret(pfields, 65, i)),
type: pfields[i + 65],
bytes: pfields[i + 66]
})
......@@ -347,11 +348,10 @@ CTaosInterface.prototype.useResult = function useResult(result) {
return fields;
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
//let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
let pblock = this.libtaos.taos_fetch_row(result);
let num_of_rows = 1;
if (ref.isNull(pblock) == true) {
return {block:null, num_of_rows:0};
let pblock = ref.NULL_POINTER;
let num_of_rows = this.libtaos.taos_fetch_block(result, pblock);
if (ref.isNull(pblock.deref()) == true) {
return { block: null, num_of_rows: 0 };
}
var fieldL = this.libtaos.taos_fetch_lengths(result);
......@@ -361,8 +361,8 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
var fieldlens = [];
if (ref.isNull(fieldL) == false) {
for (let i = 0; i < fields.length; i ++) {
let plen = ref.reinterpret(fieldL, 4, i*4);
for (let i = 0; i < fields.length; i++) {
let plen = ref.reinterpret(fieldL, 4, i * 4);
let len = plen.readInt32LE(0);
fieldlens.push(len);
}
......@@ -370,21 +370,23 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
let blocks = new Array(fields.length);
blocks.fill(null);
//num_of_rows = Math.abs(num_of_rows);
num_of_rows = Math.abs(num_of_rows);
let offset = 0;
let ptr = pblock.deref();
for (let i = 0; i < fields.length; i++) {
pdata = ref.reinterpret(pblock,8,i*8);
if(ref.isNull(pdata.readPointer())){
pdata = ref.reinterpret(ptr, 8, i * 8);
if (ref.isNull(pdata.readPointer())) {
blocks[i] = new Array();
}else{
} else {
pdata = ref.ref(pdata.readPointer());
if (!convertFunctions[fields[i]['type']] ) {
if (!convertFunctions[fields[i]['type']]) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](pdata, 1, fieldlens[i], offset, isMicro);
blocks[i] = convertFunctions[fields[i]['type']](pdata, num_of_rows, fieldlens[i], offset, isMicro);
}
}
return {blocks: blocks, num_of_rows:Math.abs(num_of_rows)}
return { blocks: blocks, num_of_rows }
}
CTaosInterface.prototype.fetchRow = function fetchRow(result, fields) {
let row = this.libtaos.taos_fetch_row(result);
......@@ -414,7 +416,7 @@ CTaosInterface.prototype.errStr = function errStr(result) {
// Async
CTaosInterface.prototype.query_a = function query_a(connection, sql, callback, param = ref.ref(ref.NULL)) {
// void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, int), void *param)
callback = ffi.Callback(ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.int ], callback);
callback = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], callback);
this.libtaos.taos_query_a(connection, ref.allocCString(sql), callback, param);
return param;
}
......@@ -440,21 +442,21 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback,
var fieldlens = [];
if (ref.isNull(fieldL) == false) {
for (let i = 0; i < fields.length; i ++) {
let plen = ref.reinterpret(fieldL, 8, i*8);
let len = ref.get(plen,0,ref.types.int32);
for (let i = 0; i < fields.length; i++) {
let plen = ref.reinterpret(fieldL, 8, i * 8);
let len = ref.get(plen, 0, ref.types.int32);
fieldlens.push(len);
}
}
if (numOfRows2 > 0){
if (numOfRows2 > 0) {
for (let i = 0; i < fields.length; i++) {
if(ref.isNull(pdata.readPointer())){
if (ref.isNull(pdata.readPointer())) {
blocks[i] = new Array();
}else{
if (!convertFunctions[fields[i]['type']] ) {
} else {
if (!convertFunctions[fields[i]['type']]) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
let prow = ref.reinterpret(row,8,i*8);
let prow = ref.reinterpret(row, 8, i * 8);
prow = prow.readPointer();
prow = ref.ref(prow);
blocks[i] = convertFunctions[fields[i]['type']](prow, 1, fieldlens[i], offset, isMicro);
......@@ -464,21 +466,21 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback,
}
callback(param2, result2, numOfRows2, blocks);
}
asyncCallbackWrapper = ffi.Callback(ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.int ], asyncCallbackWrapper);
asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], asyncCallbackWrapper);
this.libtaos.taos_fetch_rows_a(result, asyncCallbackWrapper, param);
return param;
}
// Fetch field meta data by result handle
CTaosInterface.prototype.fetchFields_a = function fetchFields_a (result) {
CTaosInterface.prototype.fetchFields_a = function fetchFields_a(result) {
let pfields = this.fetchFields(result);
let pfieldscount = this.numFields(result);
let fields = [];
if (ref.isNull(pfields) == false) {
pfields = ref.reinterpret(pfields, 68 * pfieldscount , 0);
pfields = ref.reinterpret(pfields, 68 * pfieldscount, 0);
for (let i = 0; i < pfields.length; i += 68) {
//0 - 64 = name //65 = type, 66 - 67 = bytes
fields.push( {
name: ref.readCString(ref.reinterpret(pfields,65,i)),
fields.push({
name: ref.readCString(ref.reinterpret(pfields, 65, i)),
type: pfields[i + 65],
bytes: pfields[i + 66]
})
......@@ -488,7 +490,7 @@ CTaosInterface.prototype.fetchFields_a = function fetchFields_a (result) {
}
// Stop a query by result handle
CTaosInterface.prototype.stopQuery = function stopQuery(result) {
if (result != null){
if (result != null) {
this.libtaos.taos_stop_query(result);
}
else {
......@@ -509,13 +511,13 @@ CTaosInterface.prototype.subscribe = function subscribe(connection, restart, top
try {
sql = sql != null ? ref.allocCString(sql) : ref.alloc(ref.types.char_ptr, ref.NULL);
}
catch(err) {
catch (err) {
throw "Attribute Error: sql is expected as a str";
}
try {
topic = topic != null ? ref.allocCString(topic) : ref.alloc(ref.types.char_ptr, ref.NULL);
}
catch(err) {
catch (err) {
throw TypeError("topic is expected as a str");
}
......@@ -539,8 +541,8 @@ CTaosInterface.prototype.consume = function consume(subscription) {
pfields = ref.reinterpret(pfields, this.numFields(result) * 68, 0);
for (let i = 0; i < pfields.length; i += 68) {
//0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
fields.push( {
name: ref.readCString(ref.reinterpret(pfields,64,i)),
fields.push({
name: ref.readCString(ref.reinterpret(pfields, 64, i)),
bytes: pfields[i + 64],
type: pfields[i + 66]
})
......@@ -548,7 +550,7 @@ CTaosInterface.prototype.consume = function consume(subscription) {
}
let data = [];
while(true) {
while (true) {
let { blocks, num_of_rows } = this.fetchBlock(result, fields);
if (num_of_rows == 0) {
break;
......@@ -559,7 +561,7 @@ CTaosInterface.prototype.consume = function consume(subscription) {
for (let j = 0; j < fields.length; j++) {
rowBlock[j] = blocks[j][i];
}
data[data.length-1] = (rowBlock);
data[data.length - 1] = (rowBlock);
}
}
return { data: data, fields: fields, result: result };
......@@ -570,11 +572,11 @@ CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) {
}
// Continuous Query
CTaosInterface.prototype.openStream = function openStream(connection, sql, callback, stime,stoppingCallback, param = ref.ref(ref.NULL)) {
CTaosInterface.prototype.openStream = function openStream(connection, sql, callback, stime, stoppingCallback, param = ref.ref(ref.NULL)) {
try {
sql = ref.allocCString(sql);
}
catch(err) {
catch (err) {
throw "Attribute Error: sql string is expected as a str";
}
var cti = this;
......@@ -587,7 +589,7 @@ CTaosInterface.prototype.openStream = function openStream(connection, sql, callb
let offset = 0;
if (numOfRows2 > 0) {
for (let i = 0; i < fields.length; i++) {
if (!convertFunctions[fields[i]['type']] ) {
if (!convertFunctions[fields[i]['type']]) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro);
......@@ -596,8 +598,8 @@ CTaosInterface.prototype.openStream = function openStream(connection, sql, callb
}
callback(param2, result2, blocks, fields);
}
asyncCallbackWrapper = ffi.Callback(ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.refType(ref.types.void_ptr2) ], asyncCallbackWrapper);
asyncStoppingCallbackWrapper = ffi.Callback( ref.types.void, [ ref.types.void_ptr ], stoppingCallback);
asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.refType(ref.types.void_ptr2)], asyncCallbackWrapper);
asyncStoppingCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr], stoppingCallback);
let streamHandle = this.libtaos.taos_open_stream(connection, sql, asyncCallbackWrapper, stime, param, asyncStoppingCallbackWrapper);
if (ref.isNull(streamHandle)) {
throw new errors.TDError('Failed to open a stream with TDengine');
......
const ref = require('ref-napi');
require('./globalfunc.js')
const CTaosInterface = require('./cinterface')
const errors = require ('./error')
const errors = require('./error')
const TaosQuery = require('./taosquery')
const { PerformanceObserver, performance } = require('perf_hooks');
module.exports = TDengineCursor;
......@@ -22,7 +22,7 @@ module.exports = TDengineCursor;
* @property {fields} - Array of the field objects in order from left to right of the latest data retrieved
* @since 1.0.0
*/
function TDengineCursor(connection=null) {
function TDengineCursor(connection = null) {
//All parameters are store for sync queries only.
this._rowcount = -1;
this._connection = null;
......@@ -176,27 +176,22 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
throw new errors.OperationalError("Invalid use of fetchall, either result or fields from query are null. First execute a query first");
}
let data = [];
let num_of_rows = this._chandle.affectedRows(this._result);
let data = new Array(num_of_rows);
this._rowcount = 0;
//let nodetime = 0;
let time = 0;
const obs = new PerformanceObserver((items) => {
time += items.getEntries()[0].duration;
performance.clearMarks();
});
/*
const obs2 = new PerformanceObserver((items) => {
nodetime += items.getEntries()[0].duration;
performance.clearMarks();
});
obs2.observe({ entryTypes: ['measure'] });
performance.mark('nodea');
*/
obs.observe({ entryTypes: ['measure'] });
performance.mark('A');
while(true) {
while (true) {
let blockAndRows = this._chandle.fetchBlock(this._result, this._fields);
// console.log(blockAndRows);
// break;
let block = blockAndRows.blocks;
let num_of_rows = blockAndRows.num_of_rows;
if (num_of_rows == 0) {
......@@ -205,16 +200,18 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
this._rowcount += num_of_rows;
let numoffields = this._fields.length;
for (let i = 0; i < num_of_rows; i++) {
data.push([]);
// data.push([]);
let rowBlock = new Array(numoffields);
for (let j = 0; j < numoffields; j++) {
rowBlock[j] = block[j][i];
}
data[data.length-1] = (rowBlock);
data[this._rowcount - num_of_rows + i] = (rowBlock);
// data.push(rowBlock);
}
}
performance.mark('B');
performance.measure('query', 'A', 'B');
let response = this._createSetResponse(this._rowcount, time)
......@@ -239,7 +236,7 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
* @return {number | Buffer} Number of affected rows or a Buffer that points to the results of the query
* @since 1.0.0
*/
TDengineCursor.prototype.execute_a = function execute_a (operation, options, callback, param) {
TDengineCursor.prototype.execute_a = function execute_a(operation, options, callback, param) {
if (operation == undefined) {
throw new errors.ProgrammingError('No operation passed as argument');
return null;
......@@ -265,14 +262,14 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal
}
if (resCode >= 0) {
// let fieldCount = cr._chandle.numFields(res2);
// if (fieldCount == 0) {
// //cr._chandle.freeResult(res2);
// return res2;
// }
// else {
// return res2;
// }
// let fieldCount = cr._chandle.numFields(res2);
// if (fieldCount == 0) {
// //cr._chandle.freeResult(res2);
// return res2;
// }
// else {
// return res2;
// }
return res2;
}
......@@ -360,17 +357,17 @@ TDengineCursor.prototype.fetchall_a = function fetchall_a(result, options, callb
for (let k = 0; k < fields.length; k++) {
rowBlock[k] = block[k][j];
}
data[data.length-1] = rowBlock;
data[data.length - 1] = rowBlock;
}
}
cr._chandle.freeResult(result2); // free result, avoid seg faults and mem leaks!
callback(param2, result2, numOfRows2, {data:data,fields:fields});
callback(param2, result2, numOfRows2, { data: data, fields: fields });
}
}
ref.writeObject(buf, 0, param);
param = this._chandle.fetch_rows_a(result, asyncCallbackWrapper, buf); //returned param
return {param:param,result:result};
return { param: param, result: result };
}
/**
* Stop a query given the result handle.
......@@ -428,7 +425,7 @@ TDengineCursor.prototype.subscribe = function subscribe(config) {
*/
TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) {
while (true) {
let { data, fields, result} = this._chandle.consume(subscription);
let { data, fields, result } = this._chandle.consume(subscription);
callback(data, fields, result);
}
}
......@@ -450,7 +447,7 @@ TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) {
* @return {Buffer} A buffer pointing to the stream handle
* @since 1.3.0
*/
TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) {
TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) {
let buf = ref.alloc('Object');
ref.writeObject(buf, 0, param);
......@@ -463,17 +460,17 @@ TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) {
for (let k = 0; k < fields.length; k++) {
rowBlock[k] = blocks[k][j];
}
data[data.length-1] = rowBlock;
data[data.length - 1] = rowBlock;
}
callback(param2, result2, blocks, fields);
}
return this._chandle.openStream(this._connection._conn, sql, asyncCallbackWrapper, stime, stoppingCallback, buf);
}
/**
}
/**
* Close a stream
* @param {Buffer} - A buffer pointing to the handle of the stream to be closed
* @since 1.3.0
*/
TDengineCursor.prototype.closeStream = function closeStream(stream) {
TDengineCursor.prototype.closeStream = function closeStream(stream) {
this._chandle.closeStream(stream);
}
}
{
"name": "td2.0-connector",
"version": "2.0.6",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"array-index": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/array-index/-/array-index-1.0.0.tgz",
"integrity": "sha1-7FanSe4QPk4Ix5C5w1PfFgVbl/k=",
"requires": {
"debug": "^2.2.0",
"es6-symbol": "^3.0.2"
},
"dependencies": {
"debug": {
"version": "2.6.9",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz",
"integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==",
"requires": {
"ms": "2.0.0"
}
},
"ms": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz",
"integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g="
}
}
},
"d": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/d/-/d-1.0.1.tgz",
"integrity": "sha512-m62ShEObQ39CfralilEQRjH6oAMtNCV1xJyEx5LpRYUVN+EviphDgUc/F3hnYbADmkiNs67Y+3ylmlG7Lnu+FA==",
"requires": {
"es5-ext": "^0.10.50",
"type": "^1.0.1"
}
},
"debug": {
"version": "4.3.1",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz",
"integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==",
"requires": {
"ms": "2.1.2"
}
},
"es5-ext": {
"version": "0.10.53",
"resolved": "https://registry.npmjs.org/es5-ext/-/es5-ext-0.10.53.tgz",
"integrity": "sha512-Xs2Stw6NiNHWypzRTY1MtaG/uJlwCk8kH81920ma8mvN8Xq1gsfhZvpkImLQArw8AHnv8MT2I45J3c0R8slE+Q==",
"requires": {
"es6-iterator": "~2.0.3",
"es6-symbol": "~3.1.3",
"next-tick": "~1.0.0"
}
},
"es6-iterator": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/es6-iterator/-/es6-iterator-2.0.3.tgz",
"integrity": "sha1-p96IkUGgWpSwhUQDstCg+/qY87c=",
"requires": {
"d": "1",
"es5-ext": "^0.10.35",
"es6-symbol": "^3.1.1"
}
},
"es6-symbol": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/es6-symbol/-/es6-symbol-3.1.3.tgz",
"integrity": "sha512-NJ6Yn3FuDinBaBRWl/q5X/s4koRHBrgKAu+yGI6JCBeiu3qrcbJhwT2GeR/EXVfylRk8dpQVJoLEFhK+Mu31NA==",
"requires": {
"d": "^1.0.1",
"ext": "^1.1.2"
}
},
"ext": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/ext/-/ext-1.4.0.tgz",
"integrity": "sha512-Key5NIsUxdqKg3vIsdw9dSuXpPCQ297y6wBjL30edxwPgt2E44WcWBZey/ZvUc6sERLTxKdyCu4gZFmUbk1Q7A==",
"requires": {
"type": "^2.0.0"
},
"dependencies": {
"type": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/type/-/type-2.1.0.tgz",
"integrity": "sha512-G9absDWvhAWCV2gmF1zKud3OyC61nZDwWvBL2DApaVFogI07CprggiQAOOjvp2NRjYWFzPyu7vwtDrQFq8jeSA=="
}
}
},
"ffi-napi": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/ffi-napi/-/ffi-napi-3.1.0.tgz",
"integrity": "sha512-EsHO+sP2p/nUC/3l/l8m9niee1BLm4asUFDzkkBGR4kYVgp2KqdAYUomZhkKtzim4Fq7mcYHjpUaIHsMqs+E1g==",
"requires": {
"debug": "^4.1.1",
"get-uv-event-loop-napi-h": "^1.0.5",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1",
"ref-napi": "^2.0.1",
"ref-struct-di": "^1.1.0"
},
"dependencies": {
"ref-napi": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ref-napi/-/ref-napi-2.1.2.tgz",
"integrity": "sha512-aFl+vrIuLWUXMUTQGAwGAuSNLX3Ub5W3iVP8b7KyFFZUdn4+i4U1TXXTop0kCTUfGNu8glBGVz4lowkwMcPVVA==",
"requires": {
"debug": "^4.1.1",
"get-symbol-from-current-process-h": "^1.0.2",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1"
}
}
}
},
"get-symbol-from-current-process-h": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/get-symbol-from-current-process-h/-/get-symbol-from-current-process-h-1.0.2.tgz",
"integrity": "sha512-syloC6fsCt62ELLrr1VKBM1ggOpMdetX9hTrdW77UQdcApPHLmf7CI7OKcN1c9kYuNxKcDe4iJ4FY9sX3aw2xw=="
},
"get-uv-event-loop-napi-h": {
"version": "1.0.6",
"resolved": "https://registry.npmjs.org/get-uv-event-loop-napi-h/-/get-uv-event-loop-napi-h-1.0.6.tgz",
"integrity": "sha512-t5c9VNR84nRoF+eLiz6wFrEp1SE2Acg0wS+Ysa2zF0eROes+LzOfuTaVHxGy8AbS8rq7FHEJzjnCZo1BupwdJg==",
"requires": {
"get-symbol-from-current-process-h": "^1.0.1"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"next-tick": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/next-tick/-/next-tick-1.0.0.tgz",
"integrity": "sha1-yobR/ogoFpsBICCOPchCS524NCw="
},
"node-addon-api": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-2.0.2.tgz",
"integrity": "sha512-Ntyt4AIXyaLIuMHF6IOoTakB3K+RWxwtsHNRxllEoA6vPwP9o4866g6YWDLUdnucilZhmkxiHwHr11gAENw+QA=="
},
"node-gyp-build": {
"version": "4.2.3",
"resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.2.3.tgz",
"integrity": "sha512-MN6ZpzmfNCRM+3t57PTJHgHyw/h4OWnZ6mR8P5j/uZtqQr46RRuDE/P+g3n0YR/AiYXeWixZZzaip77gdICfRg=="
},
"ref-array-napi": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/ref-array-napi/-/ref-array-napi-1.2.1.tgz",
"integrity": "sha512-jQp2WWSucmxkqVfoNfm7yDlDeGu3liAbzqfwjNybL80ooLOCnCZpAK2woDInY+lxNOK/VlIVSqeDEYb4gVPuNQ==",
"requires": {
"array-index": "1",
"debug": "2",
"ref-napi": "^1.4.2"
},
"dependencies": {
"debug": {
"version": "2.6.9",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz",
"integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==",
"requires": {
"ms": "2.0.0"
}
},
"ms": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz",
"integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g="
},
"ref-napi": {
"version": "1.5.2",
"resolved": "https://registry.npmjs.org/ref-napi/-/ref-napi-1.5.2.tgz",
"integrity": "sha512-hwyNmWpUkt1bDWDW4aiwCoC+SJfJO69UIdjqssNqdaS0sYJpgqzosGg/rLtk69UoQ8drZdI9yyQefM7eEMM3Gw==",
"requires": {
"debug": "^3.1.0",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1"
},
"dependencies": {
"debug": {
"version": "3.2.7",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz",
"integrity": "sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==",
"requires": {
"ms": "^2.1.1"
}
},
"ms": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="
}
}
}
}
},
"ref-napi": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/ref-napi/-/ref-napi-3.0.1.tgz",
"integrity": "sha512-W3rcb0E+tlO9u9ySFnX5vifInwwPGToOfFgTZUHJBNiOBsW0NNvgHz2zJN7ctABo/2yIlgdPQUvuqqfORIF4LA==",
"requires": {
"debug": "^4.1.1",
"get-symbol-from-current-process-h": "^1.0.2",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1"
}
},
"ref-struct-di": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ref-struct-di/-/ref-struct-di-1.1.1.tgz",
"integrity": "sha512-2Xyn/0Qgz89VT+++WP0sTosdm9oeowLP23wRJYhG4BFdMUrLj3jhwHZNEytYNYgtPKLNTP3KJX4HEgBvM1/Y2g==",
"requires": {
"debug": "^3.1.0"
},
"dependencies": {
"debug": {
"version": "3.2.7",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz",
"integrity": "sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==",
"requires": {
"ms": "^2.1.1"
}
}
}
},
"ref-struct-napi": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/ref-struct-napi/-/ref-struct-napi-1.1.1.tgz",
"integrity": "sha512-YgS5/d7+kT5zgtySYI5ieH0hREdv+DabgDvoczxsui0f9VLm0rrDcWEj4DHKehsH+tJnVMsLwuyctWgvdEcVRw==",
"requires": {
"debug": "2",
"ref-napi": "^1.4.2"
},
"dependencies": {
"debug": {
"version": "2.6.9",
"resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz",
"integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==",
"requires": {
"ms": "2.0.0"
}
},
"ms": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz",
"integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g="
},
"ref-napi": {
"version": "1.5.2",
"resolved": "https://registry.npmjs.org/ref-napi/-/ref-napi-1.5.2.tgz",
"integrity": "sha512-hwyNmWpUkt1bDWDW4aiwCoC+SJfJO69UIdjqssNqdaS0sYJpgqzosGg/rLtk69UoQ8drZdI9yyQefM7eEMM3Gw==",
"requires": {
"debug": "^3.1.0",
"node-addon-api": "^2.0.0",
"node-gyp-build": "^4.2.1"
},
"dependencies": {
"debug": {
"version": "3.2.7",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz",
"integrity": "sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==",
"requires": {
"ms": "^2.1.1"
}
},
"ms": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="
}
}
}
}
},
"type": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/type/-/type-1.2.0.tgz",
"integrity": "sha512-+5nt5AAniqsCnu2cEQQdpzCAh33kVx8n0VoFidKpB1dVVLAN/F+bgVOqOJqOnEnrhp222clB5p3vUlD+1QAnfg=="
}
}
}
{
"name": "td2.0-connector",
"version": "2.0.6",
"version": "2.0.7",
"description": "A Node.js connector for TDengine.",
"main": "tdengine.js",
"directories": {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册