未验证 提交 7b3f5ce0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2819 from taosdata/feature/pyconn

Feature/pyconn: update nodes connector to support 2.0
......@@ -917,15 +917,16 @@ TDengine 同时也提供了node.js 的连接器。用户可以通过[npm](https:
首先,通过[npm](https://www.npmjs.com/)安装node.js 连接器.
```cmd
npm install td-connector
npm install td2.0-connector
```
我们建议用户使用npm 安装node.js连接器。如果您没有安装npm, 可以将*src/connector/nodejs/*拷贝到您的nodejs 项目目录下
我们使用[node-gyp](https://github.com/nodejs/node-gyp)和TDengine服务端进行交互。安装node.js 连接器之前,还需安装以下软件:
### Unix
### Linux
- `python` (建议`v2.7` , `v3.x.x` 目前还不支持)
- `node` 必须采用v8.x版本,之后的版本存在兼容性的问题。
- `make`
- c语言编译器比如[GCC](https://gcc.gnu.org)
......@@ -980,10 +981,10 @@ npm install td-connector
#### 连接
使用node.js连接器时,必须先<em>require</em> ```td-connector```,然后使用 ```taos.connect``` 函数。```taos.connect``` 函数必须提供的参数是```host```,其它参数在没有提供的情况下会使用如下的默认值。最后需要初始化```cursor``` 来和TDengine服务端通信
使用node.js连接器时,必须先<em>require</em> ```td2.0-connector```,然后使用 ```taos.connect``` 函数。```taos.connect``` 函数必须提供的参数是```host```,其它参数在没有提供的情况下会使用如下的默认值。最后需要初始化```cursor``` 来和TDengine服务端通信
```javascript
const taos = require('td-connector');
const taos = require('td2.0-connector');
var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:0})
var cursor = conn.cursor(); // Initializing a new cursor
```
......
......@@ -10,6 +10,7 @@ const Struct = require('ref-struct');
const FieldTypes = require('./constants');
const errors = require ('./error');
const TaosObjects = require('./taosobjects');
const { NULL_POINTER } = require('ref');
module.exports = CTaosInterface;
......@@ -177,9 +178,10 @@ var char_arr = ArrayType(ref.types.char);
var TaosField = Struct({
'name': char_arr,
});
TaosField.fields.name.type.size = 64;
TaosField.defineProperty('bytes', ref.types.short);
TaosField.fields.name.type.size = 65;
TaosField.defineProperty('type', ref.types.char);
TaosField.defineProperty('bytes', ref.types.short);
/**
*
......@@ -202,10 +204,10 @@ function CTaosInterface (config = null, pass = false) {
'taos_connect': [ ref.types.void_ptr, [ ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int ] ],
//void taos_close(TAOS *taos)
'taos_close': [ ref.types.void, [ ref.types.void_ptr ] ],
//TAOS_RES *taos_use_result(TAOS *taos);
'taos_use_result': [ ref.types.void_ptr, [ ref.types.void_ptr ] ],
//int *taos_fetch_lengths(TAOS_RES *taos);
'taos_fetch_lengths': [ ref.types.void_ptr, [ ref.types.void_ptr ] ],
//int taos_query(TAOS *taos, char *sqlstr)
'taos_query': [ ref.types.int, [ ref.types.void_ptr, ref.types.char_ptr ] ],
'taos_query': [ ref.types.void_ptr, [ ref.types.void_ptr, ref.types.char_ptr ] ],
//int taos_affected_rows(TAOS *taos)
'taos_affected_rows': [ ref.types.int, [ ref.types.void_ptr] ],
//int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)
......@@ -329,22 +331,22 @@ CTaosInterface.prototype.query = function query(connection, sql) {
CTaosInterface.prototype.affectedRows = function affectedRows(connection) {
return this.libtaos.taos_affected_rows(connection);
}
CTaosInterface.prototype.useResult = function useResult(connection) {
let result = this.libtaos.taos_use_result(connection);
CTaosInterface.prototype.useResult = function useResult(result) {
let fields = [];
let pfields = this.fetchFields(result);
if (ref.isNull(pfields) == false) {
pfields = ref.reinterpret(pfields, this.fieldsCount(connection) * 68, 0);
pfields = ref.reinterpret(pfields, this.fieldsCount(result) * 68, 0);
for (let i = 0; i < pfields.length; i += 68) {
//0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
fields.push( {
name: ref.readCString(ref.reinterpret(pfields,64,i)),
bytes: pfields[i + 64],
type: pfields[i + 66]
name: ref.readCString(ref.reinterpret(pfields,65,i)),
type: pfields[i + 65],
bytes: pfields[i + 66]
})
}
}
return {result:result, fields:fields}
return fields;
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
......@@ -352,19 +354,33 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
if (num_of_rows == 0) {
return {block:null, num_of_rows:0};
}
let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO)
var fieldL = this.libtaos.taos_fetch_lengths(result);
let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);
var fieldlens = [];
if (ref.isNull(fieldL) == false) {
for (let i = 0; i < fields.length; i ++) {
let plen = ref.reinterpret(fieldL, 4, i*4);
let len = plen.readInt32LE(0);
fieldlens.push(len);
}
}
let blocks = new Array(fields.length);
blocks.fill(null);
num_of_rows = Math.abs(num_of_rows);
let offset = 0;
pblock = pblock.deref();
for (let i = 0; i < fields.length; i++) {
pdata = ref.reinterpret(pblock,8,i*8);
pdata = ref.ref(pdata.readPointer());
if (!convertFunctions[fields[i]['type']] ) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](pblock, num_of_rows, fields[i]['bytes'], offset, isMicro);
offset += fields[i]['bytes'] * num_of_rows;
blocks[i] = convertFunctions[fields[i]['type']](pdata, 1, fieldlens[i], offset, isMicro);
}
return {blocks: blocks, num_of_rows:Math.abs(num_of_rows)}
}
......@@ -381,17 +397,17 @@ CTaosInterface.prototype.numFields = function numFields(result) {
return this.libtaos.taos_num_fields(result);
}
// Fetch fields count by connection, the latest query
CTaosInterface.prototype.fieldsCount = function fieldsCount(connection) {
return this.libtaos.taos_field_count(connection);
CTaosInterface.prototype.fieldsCount = function fieldsCount(result) {
return this.libtaos.taos_field_count(result);
}
CTaosInterface.prototype.fetchFields = function fetchFields(result) {
return this.libtaos.taos_fetch_fields(result);
}
CTaosInterface.prototype.errno = function errno(connection) {
return this.libtaos.taos_errno(connection);
CTaosInterface.prototype.errno = function errno(result) {
return this.libtaos.taos_errno(result);
}
CTaosInterface.prototype.errStr = function errStr(connection) {
return ref.readCString(this.libtaos.taos_errstr(connection));
CTaosInterface.prototype.errStr = function errStr(result) {
return ref.readCString(this.libtaos.taos_errstr(result));
}
// Async
CTaosInterface.prototype.query_a = function query_a(connection, sql, callback, param = ref.ref(ref.NULL)) {
......@@ -412,18 +428,32 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback,
// Data preparation to pass to cursor. Could be bottleneck in query execution callback times.
let row = cti.libtaos.taos_fetch_row(result2);
let fields = cti.fetchFields_a(result2);
let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
let blocks = new Array(fields.length);
blocks.fill(null);
numOfRows2 = Math.abs(numOfRows2);
let offset = 0;
var fieldL = cti.libtaos.taos_fetch_lengths(result);
var fieldlens = [];
if (ref.isNull(fieldL) == false) {
for (let i = 0; i < fields.length; i ++) {
let plen = ref.reinterpret(fieldL, 8, i*8);
let len = ref.get(plen,0,ref.types.int32);
fieldlens.push(len);
}
}
if (numOfRows2 > 0){
for (let i = 0; i < fields.length; i++) {
if (!convertFunctions[fields[i]['type']] ) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro);
offset += fields[i]['bytes'] * numOfRows2;
let prow = ref.reinterpret(row,8,i*8);
prow = prow.readPointer();
prow = ref.ref(prow);
blocks[i] = convertFunctions[fields[i]['type']](prow, 1, fieldlens[i], offset, isMicro);
//offset += fields[i]['bytes'] * numOfRows2;
}
}
callback(param2, result2, numOfRows2, blocks);
......@@ -440,11 +470,11 @@ CTaosInterface.prototype.fetchFields_a = function fetchFields_a (result) {
if (ref.isNull(pfields) == false) {
pfields = ref.reinterpret(pfields, 68 * pfieldscount , 0);
for (let i = 0; i < pfields.length; i += 68) {
//0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
//0 - 64 = name //65 = type, 66 - 67 = bytes
fields.push( {
name: ref.readCString(ref.reinterpret(pfields,64,i)),
bytes: pfields[i + 64],
type: pfields[i + 66]
name: ref.readCString(ref.reinterpret(pfields,65,i)),
type: pfields[i + 65],
bytes: pfields[i + 66]
})
}
}
......
......@@ -74,9 +74,11 @@ TDengineConnection.prototype.rollback = function rollback() {
* Clear the results from connector
* @private
*/
TDengineConnection.prototype._clearResultSet = function _clearResultSet() {
/*
TDengineConnection.prototype._clearResultSet = function _clearResultSet() {
var result = this._chandle.useResult(this._conn).result;
if (result) {
this._chandle.freeResult(result)
}
}
*/
......@@ -98,7 +98,7 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback
if (this._connection == null) {
throw new errors.ProgrammingError('Cursor is not connected');
}
this._connection._clearResultSet();
this._reset_result();
let stmt = operation;
......@@ -111,18 +111,18 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback
});
obs.observe({ entryTypes: ['measure'] });
performance.mark('A');
res = this._chandle.query(this._connection._conn, stmt);
this._result = this._chandle.query(this._connection._conn, stmt);
performance.mark('B');
performance.measure('query', 'A', 'B');
}
else {
res = this._chandle.query(this._connection._conn, stmt);
this._result = this._chandle.query(this._connection._conn, stmt);
}
res = this._chandle.errno(this._result);
if (res == 0) {
let fieldCount = this._chandle.fieldsCount(this._connection._conn);
let fieldCount = this._chandle.fieldsCount(this._result);
if (fieldCount == 0) {
let affectedRowCount = this._chandle.affectedRows(this._connection._conn);
let affectedRowCount = this._chandle.affectedRows(this._result);
let response = this._createAffectedResponse(affectedRowCount, time)
if (options['quiet'] != true) {
console.log(response);
......@@ -131,16 +131,15 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback
return affectedRowCount; //return num of affected rows, common with insert, use statements
}
else {
let resAndField = this._chandle.useResult(this._connection._conn, fieldCount)
this._result = resAndField.result;
this._fields = resAndField.fields;
this.fields = resAndField.fields;
this._fields = this._chandle.useResult(this._result);
this.fields = this._fields;
wrapCB(callback);
return this._result; //return a pointer to the result
}
}
else {
throw new errors.ProgrammingError(this._chandle.errStr(this._connection._conn))
throw new errors.ProgrammingError(this._chandle.errStr(this._result))
}
}
......@@ -198,18 +197,18 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
while(true) {
let blockAndRows = this._chandle.fetchBlock(this._result, this._fields);
let block = blockAndRows.blocks;
let num_of_rows = blockAndRows.num_of_rows;
if (num_of_rows == 0) {
break;
}
this._rowcount += num_of_rows;
let numoffields = this._fields.length;
for (let i = 0; i < num_of_rows; i++) {
data.push([]);
let rowBlock = new Array(this._fields.length);
for (let j = 0; j < this._fields.length; j++) {
let rowBlock = new Array(numoffields);
for (let j = 0; j < numoffields; j++) {
rowBlock[j] = block[j][i];
}
data[data.length-1] = (rowBlock);
......@@ -221,7 +220,7 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
let response = this._createSetResponse(this._rowcount, time)
console.log(response);
this._connection._clearResultSet();
// this._connection._clearResultSet();
let fields = this.fields;
this._reset_result();
this.data = data;
......@@ -266,13 +265,15 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal
}
if (resCode >= 0) {
let fieldCount = cr._chandle.numFields(res2);
if (fieldCount == 0) {
cr._chandle.freeResult(res2);
}
else {
return res2;
}
// let fieldCount = cr._chandle.numFields(res2);
// if (fieldCount == 0) {
// //cr._chandle.freeResult(res2);
// return res2;
// }
// else {
// return res2;
// }
return res2;
}
else {
......@@ -381,6 +382,9 @@ TDengineCursor.prototype.stopQuery = function stopQuery(result) {
}
TDengineCursor.prototype._reset_result = function _reset_result() {
this._rowcount = -1;
if (this._result != null) {
this._chandle.freeResult(this._result);
}
this._result = null;
this._fields = null;
this.data = [];
......
{
"name": "td-connector",
"version": "1.6.1",
"name": "td2.0-connector",
"version": "2.0.1",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
......
{
"name": "td-connector",
"version": "2.0.0",
"name": "td2.0-connector",
"version": "2.0.1",
"description": "A Node.js connector for TDengine.",
"main": "tdengine.js",
"scripts": {
......
# TDengine Node.js connector
[![minzip](https://img.shields.io/bundlephobia/minzip/td-connector.svg)](https://github.com/taosdata/TDengine/tree/master/src/connector/nodejs) [![NPM](https://img.shields.io/npm/l/td-connector.svg)](https://github.com/taosdata/TDengine/#what-is-tdengine)
[![minzip](https://img.shields.io/bundlephobia/minzip/td2.0-connector.svg)](https://github.com/taosdata/TDengine/tree/master/src/connector/nodejs) [![NPM](https://img.shields.io/npm/l/td2.0-connector.svg)](https://github.com/taosdata/TDengine/#what-is-tdengine)
This is the Node.js library that lets you connect to [TDengine](https://www.github.com/taosdata/tdengine). It is built so that you can use as much of it as you want or as little of it as you want through providing an extensive API. If you want the raw data in the form of an array of arrays for the row data retrieved from a table, you can do that. If you want to wrap that data with objects that allow you easily manipulate and display data such as using a prettifier function, you can do that!
This is the Node.js library that lets you connect to [TDengine](https://www.github.com/taosdata/tdengine) 2.0 version. It is built so that you can use as much of it as you want or as little of it as you want through providing an extensive API. If you want the raw data in the form of an array of arrays for the row data retrieved from a table, you can do that. If you want to wrap that data with objects that allow you easily manipulate and display data such as using a prettifier function, you can do that!
## Installation
To get started, just type in the following to install the connector through [npm](https://www.npmjs.com/)
```cmd
npm install td-connector
npm install td2.0-connector
```
To interact with TDengine, we make use of the [node-gyp](https://github.com/nodejs/node-gyp) library. To install, you will need to install the following depending on platform (the following instructions are quoted from node-gyp)
### On Unix
### On Linux
- `python` (`v2.7` recommended, `v3.x.x` is **not** supported)
- `make`
- A proper C/C++ compiler toolchain, like [GCC](https://gcc.gnu.org)
- `node` (between `v10.x` and `v11.x`, other version has some dependency compatibility problems)
### On macOS
......@@ -71,12 +72,12 @@ The following is a short summary of the basic usage of the connector, the full
### Connection
To use the connector, first require the library ```td-connector```. Running the function ```taos.connect``` with the connection options passed in as an object will return a TDengine connection object. The required connection option is ```host```, other options if not set, will be the default values as shown below.
To use the connector, first require the library ```td2.0-connector```. Running the function ```taos.connect``` with the connection options passed in as an object will return a TDengine connection object. The required connection option is ```host```, other options if not set, will be the default values as shown below.
A cursor also needs to be initialized in order to interact with TDengine from Node.js.
```javascript
const taos = require('td-connector');
const taos = require('td2.0-connector');
var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:0})
var cursor = conn.cursor(); // Initializing a new cursor
```
......
......@@ -19,7 +19,7 @@ function randomBool() {
}
// Initialize
//c1.execute('drop database td_connector_test;');
c1.execute('create database if not exists td_connector_test;');
c1.execute('use td_connector_test;')
c1.execute('create table if not exists all_types (ts timestamp, _int int, _bigint bigint, _float float, _double double, _binary binary(40), _smallint smallint, _tinyint tinyint, _bool bool, _nchar nchar(40));');
......@@ -46,12 +46,14 @@ for (let i = 0; i < 10000; i++) {
}
// Select
c1.execute('select * from td_connector_test.all_types limit 10 offset 1000;');
console.log('select * from td_connector_test.all_types limit 3 offset 100;');
c1.execute('select * from td_connector_test.all_types limit 2 offset 100;');
var d = c1.fetchall();
console.log(c1.fields);
console.log(d);
// Functions
console.log('select count(*), avg(_int), sum(_float), max(_bigint), min(_double) from td_connector_test.all_types;')
c1.execute('select count(*), avg(_int), sum(_float), max(_bigint), min(_double) from td_connector_test.all_types;');
var d = c1.fetchall();
console.log(c1.fields);
......@@ -62,9 +64,11 @@ console.log(d);
c1.query('select count(*), stddev(_double), min(_tinyint) from all_types where _tinyint > 50 and _int < 0;', true).then(function(result){
result.pretty();
})
c1.query('select _tinyint, _bool from all_types where _tinyint > 50 and _int < 0 limit 50;', true).then(function(result){
result.pretty();
})
c1.query('select stddev(_double), stddev(_bigint), stddev(_float) from all_types;', true).then(function(result){
result.pretty();
})
......@@ -83,54 +87,58 @@ q.execute().then(function(r) {
// Raw Async Testing (Callbacks, not promises)
function cb2(param, result, rowCount, rd) {
console.log('CB2 Callbacked!');
console.log("RES *", result);
console.log("Async fetched", rowCount, "rows");
console.log("Async fetched", rowCount, " rows");
console.log("Passed Param: ", param);
console.log("Fields", rd.fields);
console.log("Data", rd.data);
console.log("Fields ", rd.fields);
console.log("Data ", rd.data);
}
function cb1(param,result,code) {
console.log('Callbacked!');
console.log("RES *", result);
console.log('CB1 Callbacked!');
console.log("RES * ", result);
console.log("Status: ", code);
console.log("Passed Param", param);
c1.fetchall_a(result, cb2, param)
console.log("Passed Param ", param);
c1.fetchall_a(result, cb2, param);
}
c1.execute_a("describe td_connector_test.all_types;", cb1, {myparam:3.141});
function cb4(param, result, rowCount, rd) {
console.log('CB4 Callbacked!');
console.log("RES *", result);
console.log("Async fetched", rowCount, "rows");
console.log("Passed Param: ", param);
console.log("Fields", rd.fields);
console.log("Data", rd.data);
}
// Without directly calling fetchall_a
var thisRes;
function cb3(param,result,code) {
console.log('Callbacked!');
console.log('CB3 Callbacked!');
console.log("RES *", result);
console.log("Status: ", code);
console.log("Status:", code);
console.log("Passed Param", param);
thisRes = result;
}
//Test calling execute and fetchall seperately and not through callbacks
var param = c1.execute_a("describe td_connector_test.all_types;", cb3, {e:2.718});
console.log("Passed Param outside of callback: ", param);
console.log(param);
setTimeout(function(){
c1.fetchall_a(thisRes, cb4, param);
},100);
// Async through promises
var aq = c1.query('select count(*) from td_connector_test.all_types;')
var aq = c1.query('select count(*) from td_connector_test.all_types;',false);
aq.execute_a().then(function(data) {
data.pretty();
})
});
c1.query('describe td_connector_test.stabletest;').execute_a().then(r=> r.pretty());
setTimeout(function(){
c1.query('drop database td_connector_test;');
},200);
setTimeout(function(){
conn.close();
},2000);
conn.close();
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册