未验证 提交 445168c5 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1919 from StoneT2000/nodejs-bugfix

Nodejs connector update
...@@ -241,17 +241,12 @@ function CTaosInterface (config = null, pass = false) { ...@@ -241,17 +241,12 @@ function CTaosInterface (config = null, pass = false) {
'taos_fetch_rows_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr ]], 'taos_fetch_rows_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr ]],
// Subscription // Subscription
//TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, long time, int mseconds) //TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)
////TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, int64_t time, int mseconds); 'taos_subscribe': [ ref.types.void_ptr, [ ref.types.void_ptr, ref.types.int, ref.types.char_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int] ],
'taos_subscribe': [ ref.types.void_ptr, [ ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int64, ref.types.int] ], // TAOS_RES *taos_consume(TAOS_SUB *tsub)
//TAOS_ROW taos_consume(TAOS_SUB *tsub); 'taos_consume': [ ref.types.void_ptr, [ref.types.void_ptr] ],
'taos_consume': [ ref.refType(ref.types.void_ptr2), [ref.types.void_ptr] ],
//void taos_unsubscribe(TAOS_SUB *tsub); //void taos_unsubscribe(TAOS_SUB *tsub);
'taos_unsubscribe': [ ref.types.void, [ ref.types.void_ptr ] ], 'taos_unsubscribe': [ ref.types.void, [ ref.types.void_ptr ] ],
//int taos_subfields_count(TAOS_SUB *tsub);
'taos_subfields_count': [ ref.types.int, [ref.types.void_ptr ] ],
//TAOS_FIELD *taos_fetch_subfields(TAOS_SUB *tsub);
'taos_fetch_subfields': [ ref.refType(TaosField), [ ref.types.void_ptr ] ],
// Continuous Query // Continuous Query
//TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), //TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
...@@ -362,7 +357,7 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) { ...@@ -362,7 +357,7 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
blocks.fill(null); blocks.fill(null);
num_of_rows = Math.abs(num_of_rows); num_of_rows = Math.abs(num_of_rows);
let offset = 0; let offset = 0;
pblock = pblock.deref() pblock = pblock.deref();
for (let i = 0; i < fields.length; i++) { for (let i = 0; i < fields.length; i++) {
if (!convertFunctions[fields[i]['type']] ) { if (!convertFunctions[fields[i]['type']] ) {
...@@ -472,64 +467,40 @@ CTaosInterface.prototype.getClientInfo = function getClientInfo() { ...@@ -472,64 +467,40 @@ CTaosInterface.prototype.getClientInfo = function getClientInfo() {
} }
// Subscription // Subscription
CTaosInterface.prototype.subscribe = function subscribe(host=null, user="root", password="taosdata", db=null, table=null, time=null, mseconds=null) { CTaosInterface.prototype.subscribe = function subscribe(connection, restart, topic, sql, interval) {
let dbOrig = db; let topicOrig = topic;
let tableOrig = table; let sqlOrig = sql;
try {
host = host != null ? ref.allocCString(host) : ref.alloc(ref.types.char_ptr, ref.NULL);
}
catch(err) {
throw "Attribute Error: host is expected as a str";
}
try {
user = ref.allocCString(user)
}
catch(err) {
throw "Attribute Error: user is expected as a str";
}
try { try {
password = ref.allocCString(password); sql = sql != null ? ref.allocCString(sql) : ref.alloc(ref.types.char_ptr, ref.NULL);
} }
catch(err) { catch(err) {
throw "Attribute Error: password is expected as a str"; throw "Attribute Error: sql is expected as a str";
} }
try { try {
db = db != null ? ref.allocCString(db) : ref.alloc(ref.types.char_ptr, ref.NULL); topic = topic != null ? ref.allocCString(topic) : ref.alloc(ref.types.char_ptr, ref.NULL);
} }
catch(err) { catch(err) {
throw "Attribute Error: db is expected as a str"; throw TypeError("topic is expected as a str");
}
try {
table = table != null ? ref.allocCString(table) : ref.alloc(ref.types.char_ptr, ref.NULL);
}
catch(err) {
throw TypeError("table is expected as a str");
}
try {
mseconds = ref.alloc(ref.types.int, mseconds);
} }
catch(err) {
throw TypeError("mseconds is expected as an int"); restart = ref.alloc(ref.types.int, restart);
}
//TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, int64_t time, int mseconds); let subscription = this.libtaos.taos_subscribe(connection, restart, topic, sql, null, null, interval);
let subscription = this.libtaos.taos_subscribe(host, user, password, db, table, time, mseconds);
if (ref.isNull(subscription)) { if (ref.isNull(subscription)) {
throw new errors.TDError('Failed to subscribe to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig); throw new errors.TDError('Failed to subscribe to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig);
} }
else { else {
console.log('Successfully subscribed to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig); console.log('Successfully subscribed to TDengine - Topic: ' + topicOrig);
} }
return subscription; return subscription;
} }
CTaosInterface.prototype.subFieldsCount = function subFieldsCount(subscription) {
return this.libtaos.taos_subfields_count(subscription); CTaosInterface.prototype.consume = function consume(subscription) {
} let result = this.libtaos.taos_consume(subscription);
CTaosInterface.prototype.fetchSubFields = function fetchSubFields(subscription) {
let pfields = this.libtaos.taos_fetch_subfields(subscription);
let pfieldscount = this.subFieldsCount(subscription);
let fields = []; let fields = [];
let pfields = this.fetchFields(result);
if (ref.isNull(pfields) == false) { if (ref.isNull(pfields) == false) {
pfields = ref.reinterpret(pfields, 68 * pfieldscount , 0); pfields = ref.reinterpret(pfields, this.numFields(result) * 68, 0);
for (let i = 0; i < pfields.length; i += 68) { for (let i = 0; i < pfields.length; i += 68) {
//0 - 63 = name //64 - 65 = bytes, 66 - 67 = type //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
fields.push( { fields.push( {
...@@ -539,27 +510,23 @@ CTaosInterface.prototype.fetchSubFields = function fetchSubFields(subscription) ...@@ -539,27 +510,23 @@ CTaosInterface.prototype.fetchSubFields = function fetchSubFields(subscription)
}) })
} }
} }
return fields;
} let data = [];
CTaosInterface.prototype.consume = function consume(subscription) { while(true) {
let row = this.libtaos.taos_consume(subscription); let { blocks, num_of_rows } = this.fetchBlock(result, fields);
let fields = this.fetchSubFields(subscription); if (num_of_rows == 0) {
//let isMicro = (cti.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO); break;
let isMicro = false; //no supported function for determining precision? }
let blocks = new Array(fields.length); for (let i = 0; i < num_of_rows; i++) {
blocks.fill(null); data.push([]);
let numOfRows2 = 1; //Math.abs(numOfRows2); let rowBlock = new Array(fields.length);
let offset = 0; for (let j = 0; j < fields.length; j++) {
if (numOfRows2 > 0){ rowBlock[j] = blocks[j][i];
for (let i = 0; i < fields.length; i++) {
if (!convertFunctions[fields[i]['type']] ) {
throw new errors.DatabaseError("Invalid data type returned from database");
} }
blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro); data[data.length-1] = (rowBlock);
offset += fields[i]['bytes'] * numOfRows2;
} }
} }
return {blocks:blocks, fields:fields}; return { data: data, fields: fields, result: result };
} }
CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) { CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) {
//void taos_unsubscribe(TAOS_SUB *tsub); //void taos_unsubscribe(TAOS_SUB *tsub);
......
...@@ -405,18 +405,16 @@ TDengineCursor.prototype.getClientInfo = function getClientInfo() { ...@@ -405,18 +405,16 @@ TDengineCursor.prototype.getClientInfo = function getClientInfo() {
/** /**
* Subscribe to a table from a database in TDengine. * Subscribe to a table from a database in TDengine.
* @param {Object} config - A configuration object containing the configuration options for the subscription * @param {Object} config - A configuration object containing the configuration options for the subscription
* @param {string} config.host - The host to subscribe to * @param {string} config.restart - whether or not to continue a subscription if it already exits, otherwise start from beginning
* @param {string} config.user - The user to subscribe as * @param {string} config.topic - The unique identifier of a subscription
* @param {string} config.password - The password for the said user * @param {string} config.sql - A sql statement for data query
* @param {string} config.db - The db containing the table to subscribe to * @param {string} config.interval - The pulling interval
* @param {string} config.table - The name of the table to subscribe to
* @param {number} config.time - The start time to start a subscription session
* @param {number} config.mseconds - The pulling period of the subscription session
* @return {Buffer} A buffer pointing to the subscription session handle * @return {Buffer} A buffer pointing to the subscription session handle
* @since 1.3.0 * @since 1.3.0
*/ */
TDengineCursor.prototype.subscribe = function subscribe(config) { TDengineCursor.prototype.subscribe = function subscribe(config) {
return this._chandle.subscribe(config.host, config.user, config.password, config.db, config.table, config.time, config.mseconds); let restart = config.restart ? 1 : 0;
return this._chandle.subscribe(this._connection._conn, restart, config.topic, config.sql, config.interval);
}; };
/** /**
* An infinite loop that consumes the latest data and calls a callback function that is provided. * An infinite loop that consumes the latest data and calls a callback function that is provided.
...@@ -426,18 +424,8 @@ TDengineCursor.prototype.subscribe = function subscribe(config) { ...@@ -426,18 +424,8 @@ TDengineCursor.prototype.subscribe = function subscribe(config) {
*/ */
TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) { TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) {
while (true) { while (true) {
let res = this._chandle.consume(subscription); let { data, fields, result} = this._chandle.consume(subscription);
let data = []; callback(data, fields, result);
let num_of_rows = res.blocks[0].length;
for (let j = 0; j < num_of_rows; j++) {
data.push([]);
let rowBlock = new Array(res.fields.length);
for (let k = 0; k < res.fields.length; k++) {
rowBlock[k] = res.blocks[k][j];
}
data[data.length-1] = rowBlock;
}
callback(data, res.fields, subscription);
} }
} }
/** /**
......
{ {
"name": "td-connector", "name": "td-connector",
"version": "1.5.0", "version": "1.6.1",
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
"dependencies": { "dependencies": {
......
{ {
"name": "td-connector", "name": "td-connector",
"version": "1.5.0", "version": "1.6.1",
"description": "A Node.js connector for TDengine.", "description": "A Node.js connector for TDengine.",
"main": "tdengine.js", "main": "tdengine.js",
"scripts": { "scripts": {
......
...@@ -33,12 +33,12 @@ for (let i = 0; i < 10000; i++) { ...@@ -33,12 +33,12 @@ for (let i = 0; i < 10000; i++) {
parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // Int parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // Int
parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // BigInt parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // BigInt
parseFloat( R(-3.4E38, 3.4E38) ), // Float parseFloat( R(-3.4E38, 3.4E38) ), // Float
parseFloat( R(-1.7E308, 1.7E308) ), // Double parseFloat( R(-1.7E30, 1.7E30) ), // Double
"\"Long Binary\"", // Binary "\"Long Binary\"", // Binary
parseInt( R(-32767, 32767) ), // Small Int parseInt( R(-32767, 32767) ), // Small Int
parseInt( R(-127, 127) ), // Tiny Int parseInt( R(-127, 127) ), // Tiny Int
randomBool(), randomBool(),
"\"Nchars 一些中文字幕\""]; // Bool "\"Nchars\""]; // Bool
c1.execute('insert into td_connector_test.all_types values(' + insertData.join(',') + ' );', {quiet:true}); c1.execute('insert into td_connector_test.all_types values(' + insertData.join(',') + ' );', {quiet:true});
if (i % 1000 == 0) { if (i % 1000 == 0) {
console.log("Insert # " , i); console.log("Insert # " , i);
......
const taos = require('../tdengine');
var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:10});
var c1 = conn.cursor();
let stime = new Date();
let interval = 1000;
c1.execute('use td_connector_test');
let sub = c1.subscribe({
restart: true,
sql: "select AVG(_int) from td_connector_test.all_Types;",
topic: 'all_Types',
interval: 1000
});
c1.consumeData(sub, (data, fields) => {
console.log(data);
});
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册