提交 bd526d22 编写于 作者: S StoneT2000

Node.js Connector updated to 1.3.0 - Subscription, Continuous Query, Bug Fixes

Subscription implemented
Continuous Query implemented
Fixed bugs when dealing with null values retrieved from tables
Removed unused code
Added basic performance test code
上级 5e17f67e
......@@ -52,9 +52,12 @@ function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
if (data[i] == 0) {
res[i] = false;
}
else {
else if (data[i] == 1){
res[i] = true;
}
else if (data[i] == FieldTypes.C_BOOL_NULL) {
res[i] = null;
}
}
return res;
}
......@@ -63,7 +66,8 @@ function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro=false)
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
res.push(data.readIntLE(currOffset,1));
let d = data.readIntLE(currOffset,1);
res.push(d == FieldTypes.C_TINYINT_NULL ? null : d);
currOffset += nbytes;
}
return res;
......@@ -73,7 +77,8 @@ function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro=false)
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
res.push(data.readIntLE(currOffset,2));
let d = data.readIntLE(currOffset,2);
res.push(d == FieldTypes.C_SMALLINT_NULL ? null : d);
currOffset += nbytes;
}
return res;
......@@ -83,7 +88,8 @@ function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
res.push(data.readInt32LE(currOffset));
let d = data.readInt32LE(currOffset);
res.push(d == FieldTypes.C_INT_NULL ? null : d);
currOffset += nbytes;
}
return res;
......@@ -93,7 +99,8 @@ function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
res.push(BigInt(data.readInt64LE(currOffset)));
let d = data.readInt64LE(currOffset);
res.push(d == FieldTypes.C_BIGINT_NULL ? null : BigInt(d));
currOffset += nbytes;
}
return res;
......@@ -103,7 +110,8 @@ function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
res.push(parseFloat(data.readFloatLE(currOffset).toFixed(7)));
let d = parseFloat(data.readFloatLE(currOffset).toFixed(5));
res.push(isNaN(d) ? null : d);
currOffset += nbytes;
}
return res;
......@@ -113,7 +121,8 @@ function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
res.push(parseFloat(data.readDoubleLE(currOffset).toFixed(16)));
let d = parseFloat(data.readDoubleLE(currOffset).toFixed(16));
res.push(isNaN(d) ? null : d);
currOffset += nbytes;
}
return res;
......@@ -123,8 +132,13 @@ function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
let dataEntry = data.slice(currOffset, currOffset + nbytes); //one entry in a row under a column;
res.push(ref.readCString(dataEntry));
let dataEntry = data.slice(currOffset, currOffset + nbytes);
if (dataEntry[0] == FieldTypes.C_BINARY_NULL) {
res.push(null);
}
else {
res.push(ref.readCString(dataEntry));
}
currOffset += nbytes;
}
return res;
......@@ -133,10 +147,15 @@ function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
//every 4;
// every 4 bytes, a character is encoded;
while (currOffset < data.length) {
let dataEntry = data.slice(currOffset, currOffset + nbytes); //one entry in a row under a column;
res.push(dataEntry.toString("utf16le").replace(/\u0000/g, ""));
if (dataEntry.readInt64LE(0) == FieldTypes.C_NCHAR_NULL) {
res.push(null);
}
else {
res.push(dataEntry.toString("utf16le").replace(/\u0000/g, ""));
}
currOffset += nbytes;
}
return res;
......@@ -178,7 +197,7 @@ function CTaosInterface (config = null, pass = false) {
ref.types.void_ptr = ref.refType(ref.types.void);
ref.types.void_ptr2 = ref.refType(ref.types.void_ptr);
/*Declare a bunch of functions first*/
/* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS */
/* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS * */
this.libtaos = ffi.Library('libtaos', {
'taos_options': [ ref.types.int, [ ref.types.int , ref.types.void_ptr ] ],
'taos_init': [ ref.types.void, [ ] ],
......@@ -211,16 +230,42 @@ function CTaosInterface (config = null, pass = false) {
'taos_errno': [ ref.types.int, [ ref.types.void_ptr] ],
//char *taos_errstr(TAOS *taos)
'taos_errstr': [ ref.types.char, [ ref.types.void_ptr] ],
//void taos_stop_query(TAOS_RES *res);
'taos_stop_query': [ ref.types.void, [ ref.types.void_ptr] ],
//char *taos_get_server_info(TAOS *taos);
'taos_get_server_info': [ ref.types.char_ptr, [ ref.types.void_ptr ] ],
//char *taos_get_client_info();
'taos_get_client_info': [ ref.types.char_ptr, [ ] ],
// ASYNC
// void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param)
'taos_query_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr ] ],
// void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
'taos_fetch_rows_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr ]]
'taos_fetch_rows_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr ]],
// Subscription
//TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, long time, int mseconds)
////TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, int64_t time, int mseconds);
'taos_subscribe': [ ref.types.void_ptr, [ ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int64, ref.types.int] ],
//TAOS_ROW taos_consume(TAOS_SUB *tsub);
'taos_consume': [ ref.refType(ref.types.void_ptr2), [ref.types.void_ptr] ],
//void taos_unsubscribe(TAOS_SUB *tsub);
'taos_unsubscribe': [ ref.types.void, [ ref.types.void_ptr ] ],
//int taos_subfields_count(TAOS_SUB *tsub);
'taos_subfields_count': [ ref.types.int, [ref.types.void_ptr ] ],
//TAOS_FIELD *taos_fetch_subfields(TAOS_SUB *tsub);
'taos_fetch_subfields': [ ref.refType(TaosField), [ ref.types.void_ptr ] ],
// Continuous Query
//TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
// int64_t stime, void *param, void (*callback)(void *));
'taos_open_stream': [ ref.types.void_ptr, [ ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr ] ],
//void taos_close_stream(TAOS_STREAM *tstr);
'taos_close_stream': [ ref.types.void, [ ref.types.void_ptr ] ]
});
if (pass == false) {
if (config == null) {
//check this buffer
this._config = ref.alloc(ref.types.char_ptr, ref.NULL);
}
else {
......@@ -343,7 +388,7 @@ CTaosInterface.prototype.freeResult = function freeResult(result) {
CTaosInterface.prototype.numFields = function numFields(result) {
return this.libtaos.taos_num_fields(result);
}
/** @deprecated */
// Fetch fields count by connection, the latest query
CTaosInterface.prototype.fieldsCount = function fieldsCount(connection) {
return this.libtaos.taos_field_count(connection);
}
......@@ -375,7 +420,7 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback,
// Data preparation to pass to cursor. Could be bottleneck in query execution callback times.
let row = cti.libtaos.taos_fetch_row(result2);
let fields = cti.fetchFields_a(result2);
let isMicro = (cti.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);
let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
let blocks = new Array(fields.length);
blocks.fill(null);
numOfRows2 = Math.abs(numOfRows2);
......@@ -391,13 +436,12 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback,
}
callback(param2, result2, numOfRows2, blocks);
}
asyncCallbackWrapper = ffi.Callback(ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.int], asyncCallbackWrapper);
asyncCallbackWrapper = ffi.Callback(ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.int ], asyncCallbackWrapper);
this.libtaos.taos_fetch_rows_a(result, asyncCallbackWrapper, param);
return param;
}
// Fetch field meta data by result handle
CTaosInterface.prototype.fetchFields_a = function fetchFields_a (result) {
//
let pfields = this.fetchFields(result);
let pfieldscount = this.numFields(result);
let fields = [];
......@@ -414,3 +458,157 @@ CTaosInterface.prototype.fetchFields_a = function fetchFields_a (result) {
}
return fields;
}
// Stop a query by result handle
CTaosInterface.prototype.stopQuery = function stopQuery(result) {
if (result != null){
this.libtaos.taos_stop_query(result);
}
else {
throw new errors.ProgrammingError("No result handle passed to stop query");
}
}
CTaosInterface.prototype.getServerInfo = function getServerInfo(connection) {
return ref.readCString(this.libtaos.taos_get_server_info(connection));
}
CTaosInterface.prototype.getClientInfo = function getClientInfo() {
return ref.readCString(this.libtaos.taos_get_client_info());
}
// Subscription
CTaosInterface.prototype.subscribe = function subscribe(host=null, user="root", password="taosdata", db=null, table=null, time=null, mseconds=null) {
let dbOrig = db;
let tableOrig = table;
try {
host = host != null ? ref.allocCString(host) : ref.alloc(ref.types.char_ptr, ref.NULL);
}
catch(err) {
throw "Attribute Error: host is expected as a str";
}
try {
user = ref.allocCString(user)
}
catch(err) {
throw "Attribute Error: user is expected as a str";
}
try {
password = ref.allocCString(password);
}
catch(err) {
throw "Attribute Error: password is expected as a str";
}
try {
db = db != null ? ref.allocCString(db) : ref.alloc(ref.types.char_ptr, ref.NULL);
}
catch(err) {
throw "Attribute Error: db is expected as a str";
}
try {
table = table != null ? ref.allocCString(table) : ref.alloc(ref.types.char_ptr, ref.NULL);
}
catch(err) {
throw TypeError("table is expected as a str");
}
try {
mseconds = ref.alloc(ref.types.int, mseconds);
}
catch(err) {
throw TypeError("mseconds is expected as an int");
}
//TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, int64_t time, int mseconds);
let subscription = this.libtaos.taos_subscribe(host, user, password, db, table, time, mseconds);
if (ref.isNull(subscription)) {
throw new errors.TDError('Failed to subscribe to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig);
}
else {
console.log('Successfully subscribed to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig);
}
return subscription;
}
CTaosInterface.prototype.subFieldsCount = function subFieldsCount(subscription) {
return this.libtaos.taos_subfields_count(subscription);
}
CTaosInterface.prototype.fetchSubFields = function fetchSubFields(subscription) {
let pfields = this.libtaos.taos_fetch_subfields(subscription);
let pfieldscount = this.subFieldsCount(subscription);
let fields = [];
if (ref.isNull(pfields) == false) {
pfields = ref.reinterpret(pfields, 68 * pfieldscount , 0);
for (let i = 0; i < pfields.length; i += 68) {
//0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
fields.push( {
name: ref.readCString(ref.reinterpret(pfields,64,i)),
bytes: pfields[i + 64],
type: pfields[i + 66]
})
}
}
return fields;
}
CTaosInterface.prototype.consume = function consume(subscription) {
let row = this.libtaos.taos_consume(subscription);
let fields = this.fetchSubFields(subscription);
//let isMicro = (cti.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);
let isMicro = false; //no supported function for determining precision?
let blocks = new Array(fields.length);
blocks.fill(null);
let numOfRows2 = 1; //Math.abs(numOfRows2);
let offset = 0;
if (numOfRows2 > 0){
for (let i = 0; i < fields.length; i++) {
if (!convertFunctions[fields[i]['type']] ) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro);
offset += fields[i]['bytes'] * numOfRows2;
}
}
return {blocks:blocks, fields:fields};
}
CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) {
//void taos_unsubscribe(TAOS_SUB *tsub);
this.libtaos.taos_unsubscribe(subscription);
}
// Continuous Query
CTaosInterface.prototype.openStream = function openStream(connection, sql, callback, stime,stoppingCallback, param = ref.ref(ref.NULL)) {
try {
sql = ref.allocCString(sql);
}
catch(err) {
throw "Attribute Error: sql string is expected as a str";
}
var cti = this;
let asyncCallbackWrapper = function (param2, result2, row) {
let fields = cti.fetchFields_a(result2);
let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
let blocks = new Array(fields.length);
blocks.fill(null);
let numOfRows2 = 1;
let offset = 0;
if (numOfRows2 > 0) {
for (let i = 0; i < fields.length; i++) {
if (!convertFunctions[fields[i]['type']] ) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro);
offset += fields[i]['bytes'] * numOfRows2;
}
}
callback(param2, result2, blocks, fields);
}
asyncCallbackWrapper = ffi.Callback(ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.refType(ref.types.void_ptr2) ], asyncCallbackWrapper);
asyncStoppingCallbackWrapper = ffi.Callback( ref.types.void, [ ref.types.void_ptr ], stoppingCallback);
let streamHandle = this.libtaos.taos_open_stream(connection, sql, asyncCallbackWrapper, stime, param, asyncStoppingCallbackWrapper);
if (ref.isNull(streamHandle)) {
throw new errors.TDError('Failed to open a stream with TDengine');
return false;
}
else {
console.log("Succesfully opened stream");
return streamHandle;
}
}
CTaosInterface.prototype.closeStream = function closeStream(stream) {
this.libtaos.taos_close_stream(stream);
console.log("Closed stream");
}
......@@ -38,7 +38,7 @@ module.exports = {
C_NCHAR : 10,
// NULL value definition
// NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL : 0x02,
C_BOOL_NULL : 2,
C_TINYINT_NULL : -128,
C_SMALLINT_NULL : -32768,
C_INT_NULL : -2147483648,
......
......@@ -7,7 +7,7 @@ const { PerformanceObserver, performance } = require('perf_hooks');
module.exports = TDengineCursor;
/**
* @typedef {Object} Buffer - A Node.JS buffer. Please refer to {@link https://nodejs.org/api/buffer.html} for more details
* @typedef {Object} Buffer - A Node.js buffer. Please refer to {@link https://nodejs.org/api/buffer.html} for more details
* @global
*/
......@@ -24,27 +24,21 @@ module.exports = TDengineCursor;
*/
function TDengineCursor(connection=null) {
//All parameters are store for sync queries only.
this._description = null;
this._rowcount = -1;
this._connection = null;
this._result = null;
this._fields = null;
this.data = [];
this.fields = null;
this._chandle = new CTaosInterface(null, true); //pass through, just need library loaded.
if (connection != null) {
this._connection = connection
this._chandle = connection._chandle //pass through, just need library loaded.
}
else {
throw new errors.ProgrammingError("A TDengineConnection object is required to be passed to the TDengineCursor");
}
}
/**
* Get the description of the latest query
* @since 1.0.0
* @return {string} Description
*/
TDengineCursor.prototype.description = function description() {
return this._description;
}
/**
* Get the row counts of the latest query
* @since 1.0.0
......@@ -53,9 +47,6 @@ TDengineCursor.prototype.description = function description() {
TDengineCursor.prototype.rowcount = function rowcount() {
return this._rowcount;
}
TDengineCursor.prototype.callproc = function callproc() {
return;
}
/**
* Close the cursor by setting its connection to null and freeing results from the connection and resetting the results it has stored
* @return {boolean} Whether or not the cursor was succesfully closed
......@@ -112,15 +103,21 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback
let stmt = operation;
let time = 0;
const obs = new PerformanceObserver((items) => {
time = items.getEntries()[0].duration;
performance.clearMarks();
});
obs.observe({ entryTypes: ['measure'] });
performance.mark('A');
res = this._chandle.query(this._connection._conn, stmt);
performance.mark('B');
performance.measure('query', 'A', 'B');
let res;
if (options['quiet'] != true) {
const obs = new PerformanceObserver((items) => {
time = items.getEntries()[0].duration;
performance.clearMarks();
});
obs.observe({ entryTypes: ['measure'] });
performance.mark('A');
res = this._chandle.query(this._connection._conn, stmt);
performance.mark('B');
performance.measure('query', 'A', 'B');
}
else {
res = this._chandle.query(this._connection._conn, stmt);
}
if (res == 0) {
let fieldCount = this._chandle.fieldsCount(this._connection._conn);
......@@ -139,7 +136,7 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback
this._fields = resAndField.fields;
this.fields = resAndField.fields;
wrapCB(callback);
return this._handle_result(); //return a pointer to the result
return this._result; //return a pointer to the result
}
}
else {
......@@ -271,7 +268,6 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal
if (resCode >= 0) {
let fieldCount = cr._chandle.numFields(res2);
if (fieldCount == 0) {
//get affect fields count
cr._chandle.freeResult(res2); //result will no longer be needed
}
else {
......@@ -280,8 +276,6 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal
}
else {
//new errors.ProgrammingError(this._chandle.errStr(this._connection._conn))
//how to get error by result handle?
throw new errors.ProgrammingError("Error occuring with use of execute_a async function. Status code was returned with failure");
}
}
......@@ -313,7 +307,7 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal
* @param {function} callback - callback function that is callbacked on the COMPLETE fetched data (it is calledback only once!).
* Must be of form function (param, result, rowCount, rowData)
* @param {Object} param - A parameter that is also passed to the main callback function. Important! Param must be an object, and the key "data" cannot be used
* @return {{param:Object, result:buffer}} An object with the passed parameters object and the buffer instance that is a pointer to the result handle.
* @return {{param:Object, result:Buffer}} An object with the passed parameters object and the buffer instance that is a pointer to the result handle.
* @since 1.2.0
* @example
* cursor.execute('select * from db.table');
......@@ -377,27 +371,117 @@ TDengineCursor.prototype.fetchall_a = function fetchall_a(result, options, callb
param = this._chandle.fetch_rows_a(result, asyncCallbackWrapper, buf); //returned param
return {param:param,result:result};
}
TDengineCursor.prototype.nextset = function nextset() {
return;
}
TDengineCursor.prototype.setinputsize = function setinputsize() {
return;
}
TDengineCursor.prototype.setoutputsize = function setoutputsize(size, column=null) {
return;
/**
* Stop a query given the result handle.
* @param {Buffer} result - The buffer that acts as the result handle
* @since 1.3.0
*/
TDengineCursor.prototype.stopQuery = function stopQuery(result) {
this._chandle.stopQuery(result);
}
TDengineCursor.prototype._reset_result = function _reset_result() {
this._description = null;
this._rowcount = -1;
this._result = null;
this._fields = null;
this.data = [];
this.fields = null;
}
TDengineCursor.prototype._handle_result = function _handle_result() {
this._description = [];
for (let field of this._fields) {
this._description.push([field.name, field.type]);
/**
* Get server info such as version number
* @return {string}
* @since 1.3.0
*/
TDengineCursor.prototype.getServerInfo = function getServerInfo() {
return this._chandle.getServerInfo(this._connection._conn);
}
/**
* Get client info such as version number
* @return {string}
* @since 1.3.0
*/
TDengineCursor.prototype.getClientInfo = function getClientInfo() {
return this._chandle.getClientInfo();
}
/**
* Subscribe to a table from a database in TDengine.
* @param {Object} config - A configuration object containing the configuration options for the subscription
* @param {string} config.host - The host to subscribe to
* @param {string} config.user - The user to subscribe as
* @param {string} config.password - The password for the said user
* @param {string} config.db - The db containing the table to subscribe to
* @param {string} config.table - The name of the table to subscribe to
* @param {number} config.time - The start time to start a subscription session
* @param {number} config.mseconds - The pulling period of the subscription session
* @return {Buffer} A buffer pointing to the subscription session handle
* @since 1.3.0
*/
TDengineCursor.prototype.subscribe = function subscribe(config) {
return this._chandle.subscribe(config.host, config.user, config.password, config.db, config.table, config.time, config.mseconds);
};
/**
* An infinite loop that consumes the latest data and calls a callback function that is provided.
* @param {Buffer} subscription - A buffer object pointing to the subscription session handle
* @param {function} callback - The callback function that takes the row data, field/column meta data, and the subscription session handle as input
* @since 1.3.0
*/
TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) {
while (true) {
let res = this._chandle.consume(subscription);
let data = [];
let num_of_rows = res.blocks[0].length;
for (let j = 0; j < num_of_rows; j++) {
data.push([]);
let rowBlock = new Array(res.fields.length);
for (let k = 0; k < res.fields.length; k++) {
rowBlock[k] = res.blocks[k][j];
}
data[data.length-1] = rowBlock;
}
callback(data, res.fields, subscription);
}
return this._result;
}
/**
* Unsubscribe the provided buffer object pointing to the subscription session handle
* @param {Buffer} subscription - A buffer object pointing to the subscription session handle that is to be unsubscribed
* @since 1.3.0
*/
TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) {
this._chandle.unsubscribe(subscription);
}
/**
* Open a stream with TDengine to run the sql query periodically in the background
* @param {string} sql - The query to run
* @param {function} callback - The callback function to run after each query, accepting inputs as param, result handle, data, fields meta data
* @param {number} stime - The time of the stream starts in the form of epoch milliseconds. If 0 is given, the start time is set as the current time.
* @param {function} stoppingCallback - The callback function to run when the continuous query stops. It takes no inputs
* @param {object} param - A parameter that is passed to the main callback function
* @return {Buffer} A buffer pointing to the stream handle
* @since 1.3.0
*/
TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) {
let buf = ref.alloc('Object');
ref.writeObject(buf, 0, param);
let asyncCallbackWrapper = function (param2, result2, blocks, fields) {
let data = [];
let num_of_rows = blocks[0].length;
for (let j = 0; j < num_of_rows; j++) {
data.push([]);
let rowBlock = new Array(fields.length);
for (let k = 0; k < fields.length; k++) {
rowBlock[k] = blocks[k][j];
}
data[data.length-1] = rowBlock;
}
callback(param2, result2, blocks, fields);
}
return this._chandle.openStream(this._connection._conn, sql, asyncCallbackWrapper, stime, stoppingCallback, buf);
}
/**
* Close a stream
* @param {Buffer} - A buffer pointing to the handle of the stream to be closed
* @since 1.3.0
*/
TDengineCursor.prototype.closeStream = function closeStream(stream) {
this._chandle.closeStream(stream);
}
......@@ -15,24 +15,17 @@ module.exports = TaosResult;
* @since 1.0.6
*/
function TaosResult(data, fields) {
this.data = data.map(row => new TaosRow(row));
this.rowcount = this.data.length;
this.fields = fields.map(field => new TaosField(field));
}
TaosResult.prototype.parseFields = function parseFields(fields) {
return fields.map(function(field) {
return field;
});
}
/**
* Pretty print data and the fields meta data as if you were using the taos shell
* @memberof TaosResult
* @function pretty
* @since 1.0.6
*/
TaosResult.prototype.pretty = function pretty() {
// Pretty print of the fields and the data;
let fieldsStr = "";
let sizing = [];
this.fields.forEach((field,i) => {
......@@ -55,10 +48,9 @@ TaosResult.prototype.pretty = function pretty() {
entry = entry.toTaosString();
}
else {
entry = entry.toString();
entry = entry == null ? 'null' : entry.toString();
}
rowStr += entry
//console.log(this.fields[i]._field.bytes, suggestedWidths[this.fields[i]._field.type]);
rowStr += fillEmpty(sizing[i] - entry.length) + " | ";
});
console.log(rowStr);
......
{
"name": "td-connector",
"version": "1.2.1",
"version": "1.3.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
......
{
"name": "td-connector",
"version": "1.2.1",
"version": "1.3.0",
"description": "A Node.js connector for TDengine.",
"main": "tdengine.js",
"scripts": {
......
......@@ -130,9 +130,9 @@ console.log(cursor.data); // Latest query's result data is stored in cursor.data
### Async functionality
Async queries can be performed using the same functions such as `cursor.execute`, `cursor.query`, but now with `_a` appended to them.
Async queries can be performed using the same functions such as `cursor.execute`, `TaosQuery.query`, but now with `_a` appended to them.
Say you want to execute an two async query on two seperate tables, using `cursor.query_a`, you can do that and get a TaosQuery object, which upon executing with the `execute_a` function, returns a promise that resolves with a TaosResult object.
Say you want to execute an two async query on two separate tables, using `cursor.query`, you can do that and get a TaosQuery object, which upon executing with the `execute_a` function, returns a promise that resolves with a TaosResult object.
```javascript
var promise1 = cursor.query('select count(*), avg(v1), avg(v2) from meter1;').execute_a()
......@@ -145,7 +145,6 @@ promise2.then(function(result) {
})
```
## Example
An example of using the NodeJS connector to create a table with weather data and create and execute queries can be found [here](https://github.com/taosdata/TDengine/tree/master/tests/examples/nodejs/node-example.js) (The preferred method for using the connector)
......
function memoryUsageData() {
let s = process.memoryUsage()
for (key in s) {
s[key] = (s[key]/1000000).toFixed(3) + "MB";
}
return s;
}
console.log("initial mem usage:", memoryUsageData());
const { PerformanceObserver, performance } = require('perf_hooks');
const taos = require('../tdengine');
var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:0});
var c1 = conn.cursor();
// Initialize env
c1.execute('create database if not exists td_connector_test;');
c1.execute('use td_connector_test;')
c1.execute('create table if not exists all_types (ts timestamp, _int int, _bigint bigint, _float float, _double double, _binary binary(40), _smallint smallint, _tinyint tinyint, _bool bool, _nchar nchar(40));');
c1.execute('create table if not exists stabletest (ts timestamp, v1 int, v2 int, v3 int, v4 double) tags (id int, location binary(20));')
// Insertion into single table Performance Test
var dataPrepTime = 0;
var insertTime = 0;
var insertTime5000 = 0;
var avgInsert5ktime = 0;
const obs = new PerformanceObserver((items) => {
let entry = items.getEntries()[0];
if (entry.name == 'Data Prep') {
dataPrepTime += entry.duration;
}
else if (entry.name == 'Insert'){
insertTime += entry.duration
}
else {
console.log(entry.name + ': ' + (entry.duration/1000).toFixed(8) + 's');
}
performance.clearMarks();
});
obs.observe({ entryTypes: ['measure'] });
function R(l,r) {
return Math.random() * (r - l) - r;
}
function randomBool() {
if (Math.random() < 0.5) {
return true;
}
return false;
}
function insertN(n) {
for (let i = 0; i < n; i++) {
performance.mark('A3');
let insertData = ["now + " + i + "m", // Timestamp
parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // Int
parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // BigInt
parseFloat( R(-3.4E38, 3.4E38) ), // Float
parseFloat( R(-1.7E308, 1.7E308) ), // Double
"\"Long Binary\"", // Binary
parseInt( R(-32767, 32767) ), // Small Int
parseInt( R(-127, 127) ), // Tiny Int
randomBool(),
"\"Nchars 一些中文字幕\""]; // Bool
let query = 'insert into td_connector_test.all_types values(' + insertData.join(',') + ' );';
performance.mark('B3');
performance.measure('Data Prep', 'A3', 'B3');
performance.mark('A2');
c1.execute(query, {quiet:true});
performance.mark('B2');
performance.measure('Insert', 'A2', 'B2');
if ( i % 5000 == 4999) {
console.log("Insert # " + (i+1));
console.log('Insert 5k records: ' + ((insertTime - insertTime5000)/1000).toFixed(8) + 's');
insertTime5000 = insertTime;
avgInsert5ktime = (avgInsert5ktime/1000 * Math.floor(i / 5000) + insertTime5000/1000) / Math.ceil( i / 5000);
console.log('DataPrepTime So Far: ' + (dataPrepTime/1000).toFixed(8) + 's | Inserting time So Far: ' + (insertTime/1000).toFixed(8) + 's | Avg. Insert 5k time: ' + avgInsert5ktime.toFixed(8));
}
}
}
performance.mark('insert 1E5')
insertN(1E5);
performance.mark('insert 1E5 2')
performance.measure('Insert With Logs', 'insert 1E5', 'insert 1E5 2');
console.log('DataPrepTime: ' + (dataPrepTime/1000).toFixed(8) + 's | Inserting time: ' + (insertTime/1000).toFixed(8) + 's');
dataPrepTime = 0; insertTime = 0;
//'insert into td_connector_test.all_types values (now, null,null,null,null,null,null,null,null,null);'
const taos = require('../tdengine');
var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:0});
var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:10});
var c1 = conn.cursor();
let stime = new Date();
let interval = 1000;
......@@ -23,14 +23,13 @@ function randomBool() {
c1.execute('create database if not exists td_connector_test;');
c1.execute('use td_connector_test;')
c1.execute('create table if not exists all_types (ts timestamp, _int int, _bigint bigint, _float float, _double double, _binary binary(40), _smallint smallint, _tinyint tinyint, _bool bool, _nchar nchar(40));');
c1.execute('create table if not exists stabletest (ts timestamp, v1 int, v2 int, v3 int, v4 double) tags (id int, location binary(20));')
// Shell Test : The following uses the cursor to imitate the taos shell
// Insert
for (let i = 0; i < 5000; i++) {
stime.setMilliseconds(stime.getMilliseconds() + interval);
let insertData = [convertDateToTS(stime), // Timestamp
for (let i = 0; i < 10000; i++) {
let insertData = ["now+" + i + "s", // Timestamp
parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // Int
parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // BigInt
parseFloat( R(-3.4E38, 3.4E38) ), // Float
......@@ -58,23 +57,80 @@ var d = c1.fetchall();
console.log(c1.fields);
console.log(d);
//Immediate Execution like the Shell
// Immediate Execution like the Shell
c1.query('select count(*), stddev(_double), min(_tinyint) from all_types where _tinyint > 50 and _int < 0', true).then(function(result){
c1.query('select count(*), stddev(_double), min(_tinyint) from all_types where _tinyint > 50 and _int < 0;', true).then(function(result){
result.pretty();
})
c1.query('select _tinyint, _bool from all_types where _tinyint > 50 and _int < 0 limit 50;', true).then(function(result){
result.pretty();
})
c1.query('select stddev(_double), stddev(_bigint), stddev(_float) from all_types', true).then(function(result){
c1.query('select stddev(_double), stddev(_bigint), stddev(_float) from all_types;', true).then(function(result){
result.pretty();
})
c1.query('select stddev(_double), stddev(_bigint), stddev(_float) from all_types interval(1m) limit 100;', true).then(function(result){
result.pretty();
})
var q = c1.query('select * from td_connector_test.all_types where ts >= ? and _int > ? limit 100 offset 40;').bind(new Date(1231), 100).execute().then(function(r) {
// Binding arguments, and then using promise
var q = c1.query('select * from td_connector_test.all_types where ts >= ? and _int > ? limit 100 offset 40;').bind(new Date(1231), 100)
console.log(q.query);
q.execute().then(function(r) {
r.pretty();
});
console.log(q._query);
c1.execute('drop database td_connector_test;')
// Raw Async Testing (Callbacks, not promises)
function cb2(param, result, rowCount, rd) {
console.log("RES *", result);
console.log("Async fetched", rowCount, "rows");
console.log("Passed Param: ", param);
console.log("Fields", rd.fields);
console.log("Data", rd.data);
}
function cb1(param,result,code) {
console.log('Callbacked!');
console.log("RES *", result);
console.log("Status: ", code);
console.log("Passed Param", param);
c1.fetchall_a(result, cb2, param)
}
c1.execute_a("describe td_connector_test.all_types;", cb1, {myparam:3.141});
function cb4(param, result, rowCount, rd) {
console.log("RES *", result);
console.log("Async fetched", rowCount, "rows");
console.log("Passed Param: ", param);
console.log("Fields", rd.fields);
console.log("Data", rd.data);
}
// Without directly calling fetchall_a
var thisRes;
function cb3(param,result,code) {
console.log('Callbacked!');
console.log("RES *", result);
console.log("Status: ", code);
console.log("Passed Param", param);
thisRes = result;
}
//Test calling execute and fetchall seperately and not through callbacks
var param = c1.execute_a("describe td_connector_test.all_types;", cb3, {e:2.718});
console.log("Passed Param outside of callback: ", param);
setTimeout(function(){
c1.fetchall_a(thisRes, cb4, param);
},100);
// Async through promises
var aq = c1.query('select count(*) from td_connector_test.all_types;')
aq.execute_a().then(function(data) {
data.pretty();
})
c1.query('describe td_connector_test.stabletest;').execute_a().then(r=> r.pretty());
setTimeout(function(){
c1.query('drop database td_connector_test;');
},2000);
conn.close();
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册