提交 1741d9ab 编写于 作者: L liu0x54

[TD-1001] adapt nodejs connector to TD 2.0

上级 a0ca29dc
......@@ -10,6 +10,7 @@ const Struct = require('ref-struct');
const FieldTypes = require('./constants');
const errors = require ('./error');
const TaosObjects = require('./taosobjects');
const { NULL_POINTER } = require('ref');
module.exports = CTaosInterface;
......@@ -25,7 +26,7 @@ function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro=false
if (micro == true) {
timestampConverter = convertMicrosecondsToDatetime;
}
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
......@@ -43,7 +44,7 @@ function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro=false
return res;
}
function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = new Array(data.length);
for (let i = 0; i < data.length; i++) {
if (data[i] == 0) {
......@@ -59,7 +60,7 @@ function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
return res;
}
function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
......@@ -70,7 +71,7 @@ function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro=false)
return res;
}
function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
......@@ -81,7 +82,7 @@ function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro=false)
return res;
}
function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
......@@ -92,18 +93,18 @@ function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
return res;
}
function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
let d = data.readInt64LE(currOffset);
res.push(d == FieldTypes.C_BIGINT_NULL ? null : BigInt(d));
res.push(d == FieldTypes.C_BIGINT_NULL ? null : d);
currOffset += nbytes;
}
return res;
}
function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
......@@ -114,7 +115,7 @@ function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
return res;
}
function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
......@@ -125,7 +126,7 @@ function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
return res;
}
function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
......@@ -141,7 +142,7 @@ function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
return res;
}
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro=false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
data = ref.reinterpret(data, nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
// every 4 bytes, a character is encoded;
......@@ -177,9 +178,10 @@ var char_arr = ArrayType(ref.types.char);
var TaosField = Struct({
'name': char_arr,
});
TaosField.fields.name.type.size = 64;
TaosField.defineProperty('bytes', ref.types.short);
TaosField.fields.name.type.size = 65;
TaosField.defineProperty('type', ref.types.char);
TaosField.defineProperty('bytes', ref.types.short);
/**
*
......@@ -202,14 +204,14 @@ function CTaosInterface (config = null, pass = false) {
'taos_connect': [ ref.types.void_ptr, [ ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int ] ],
//void taos_close(TAOS *taos)
'taos_close': [ ref.types.void, [ ref.types.void_ptr ] ],
//TAOS_RES *taos_use_result(TAOS *taos);
'taos_use_result': [ ref.types.void_ptr, [ ref.types.void_ptr ] ],
//int *taos_fetch_lengths(TAOS_RES *taos);
'taos_fetch_lengths': [ ref.types.void_ptr, [ ref.types.void_ptr ] ],
//int taos_query(TAOS *taos, char *sqlstr)
'taos_query': [ ref.types.int, [ ref.types.void_ptr, ref.types.char_ptr ] ],
'taos_query': [ ref.types.void_ptr, [ ref.types.void_ptr, ref.types.char_ptr ] ],
//int taos_affected_rows(TAOS *taos)
'taos_affected_rows': [ ref.types.int, [ ref.types.void_ptr] ],
//int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)
'taos_fetch_block': [ ref.types.int, [ ref.types.void_ptr, ref.types.void_ptr] ],
'taos_fetch_block': [ ref.types.int, [ ref.types.void_ptr, ref.types.void_ptr2] ],
//int taos_num_fields(TAOS_RES *res);
'taos_num_fields': [ ref.types.int, [ ref.types.void_ptr] ],
//TAOS_ROW taos_fetch_row(TAOS_RES *res)
......@@ -329,44 +331,66 @@ CTaosInterface.prototype.query = function query(connection, sql) {
CTaosInterface.prototype.affectedRows = function affectedRows(connection) {
return this.libtaos.taos_affected_rows(connection);
}
CTaosInterface.prototype.useResult = function useResult(connection) {
let result = this.libtaos.taos_use_result(connection);
CTaosInterface.prototype.useResult = function useResult(result) {
let fields = [];
let pfields = this.fetchFields(result);
if (ref.isNull(pfields) == false) {
pfields = ref.reinterpret(pfields, this.fieldsCount(connection) * 68, 0);
pfields = ref.reinterpret(pfields, this.fieldsCount(result) * 68, 0);
for (let i = 0; i < pfields.length; i += 68) {
//0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
fields.push( {
name: ref.readCString(ref.reinterpret(pfields,64,i)),
bytes: pfields[i + 64],
type: pfields[i + 66]
name: ref.readCString(ref.reinterpret(pfields,65,i)),
type: pfields[i + 65],
bytes: pfields[i + 66]
})
}
}
return {result:result, fields:fields}
return {fields:fields}
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
let pblock = ref.ref(ref.ref(ref.NULL)); // equal to our raw data
let num_of_rows = this.libtaos.taos_fetch_block(result, pblock)
if (num_of_rows == 0) {
let pblock = ref.ref(ref.NULL); // equal to our raw data
pblock = this.libtaos.taos_fetch_row(result);
if (pblock == 0) {
return {block:null, num_of_rows:0};
}
let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO)
let blocks = new Array(fields.length);
blocks.fill(null);
num_of_rows = Math.abs(num_of_rows);
//num_of_rows = Math.abs(num_of_rows);
let offset = 0;
pblock = pblock.deref();
for (let i = 0; i < fields.length; i++) {
//pblock = pblock.deref();
if (!convertFunctions[fields[i]['type']] ) {
var fieldL = this.libtaos.taos_fetch_lengths(result);
var numoffields = this.libtaos.taos_field_count(result);
let blocks = new Array(numoffields);
blocks.fill(null);
var fieldlens = [];
if (ref.isNull(fieldL) == false) {
for (let i = 0; i < numoffields; i ++) {
let plen = ref.reinterpret(fieldL, 4, i*4);
//plen = ref.readPointer(plen,0,ref.types.int);
let len = plen.readInt32LE(0);
fieldlens.push(len);
//console.log(len);
}
}
for (let i = 0; i < numoffields; i++) {
if (!convertFunctions[fields['fields'][i]['type']] ) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](pblock, num_of_rows, fields[i]['bytes'], offset, isMicro);
offset += fields[i]['bytes'] * num_of_rows;
prow = ref.reinterpret(pblock,8,i*8);
console.log(fieldlens[i]);
blocks[i] = convertFunctions[fields['fields'][i]['type']](prow, 1, fieldlens[i], 0, isMicro);
console.log('******************************');
console.log(blocks[i]);
//offset += fields[i]['bytes'] * num_of_rows;
}
return {blocks: blocks, num_of_rows:Math.abs(num_of_rows)}
return {blocks: blocks, num_of_rows:1}
}
CTaosInterface.prototype.fetchRow = function fetchRow(result, fields) {
let row = this.libtaos.taos_fetch_row(result);
......@@ -381,17 +405,17 @@ CTaosInterface.prototype.numFields = function numFields(result) {
return this.libtaos.taos_num_fields(result);
}
// Fetch fields count by connection, the latest query
CTaosInterface.prototype.fieldsCount = function fieldsCount(connection) {
return this.libtaos.taos_field_count(connection);
CTaosInterface.prototype.fieldsCount = function fieldsCount(result) {
return this.libtaos.taos_field_count(result);
}
CTaosInterface.prototype.fetchFields = function fetchFields(result) {
return this.libtaos.taos_fetch_fields(result);
}
CTaosInterface.prototype.errno = function errno(connection) {
return this.libtaos.taos_errno(connection);
CTaosInterface.prototype.errno = function errno(result) {
return this.libtaos.taos_errno(result);
}
CTaosInterface.prototype.errStr = function errStr(connection) {
return ref.readCString(this.libtaos.taos_errstr(connection));
CTaosInterface.prototype.errStr = function errStr(result) {
return ref.readCString(this.libtaos.taos_errstr(result));
}
// Async
CTaosInterface.prototype.query_a = function query_a(connection, sql, callback, param = ref.ref(ref.NULL)) {
......@@ -411,19 +435,39 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback,
let asyncCallbackWrapper = function (param2, result2, numOfRows2) {
// Data preparation to pass to cursor. Could be bottleneck in query execution callback times.
let row = cti.libtaos.taos_fetch_row(result2);
console.log(row);
let fields = cti.fetchFields_a(result2);
let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
let blocks = new Array(fields.length);
blocks.fill(null);
numOfRows2 = Math.abs(numOfRows2);
let offset = 0;
var fieldL = cti.libtaos.taos_fetch_lengths(result);
var fieldlens = [];
if (ref.isNull(fieldL) == false) {
for (let i = 0; i < fields.length; i ++) {
let plen = ref.reinterpret(fieldL, 8, i*8);
let len = ref.get(plen,0,ref.types.int32);
fieldlens.push(len);
console.log('11111111111111111111');
console.log(fields.length);
console.log(len);
}
}
if (numOfRows2 > 0){
for (let i = 0; i < fields.length; i++) {
if (!convertFunctions[fields[i]['type']] ) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro);
offset += fields[i]['bytes'] * numOfRows2;
let prow = ref.reinterpret(row,8,i*8);
//blocks[i] = convertFunctions[fields[i]['type']](ref.get(prow,0,ref.types.void_ptr), numOfRows2, fieldlens[i], 0, isMicro);
console.log(prow);
blocks[i] = convertFunctions[fields[i]['type']](ref.readPointer(prow), numOfRows2, fieldlens[i], 0, isMicro);
//offset += fields[i]['bytes'] * numOfRows2;
}
}
callback(param2, result2, numOfRows2, blocks);
......@@ -440,11 +484,11 @@ CTaosInterface.prototype.fetchFields_a = function fetchFields_a (result) {
if (ref.isNull(pfields) == false) {
pfields = ref.reinterpret(pfields, 68 * pfieldscount , 0);
for (let i = 0; i < pfields.length; i += 68) {
//0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
//0 - 64 = name //65 = type, 66 - 67 = bytes
fields.push( {
name: ref.readCString(ref.reinterpret(pfields,64,i)),
bytes: pfields[i + 64],
type: pfields[i + 66]
name: ref.readCString(ref.reinterpret(pfields,65,i)),
type: pfields[i + 65],
bytes: pfields[i + 66]
})
}
}
......
......@@ -74,9 +74,11 @@ TDengineConnection.prototype.rollback = function rollback() {
* Clear the results from connector
* @private
*/
TDengineConnection.prototype._clearResultSet = function _clearResultSet() {
/*
TDengineConnection.prototype._clearResultSet = function _clearResultSet() {
var result = this._chandle.useResult(this._conn).result;
if (result) {
this._chandle.freeResult(result)
}
}
*/
......@@ -98,7 +98,7 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback
if (this._connection == null) {
throw new errors.ProgrammingError('Cursor is not connected');
}
this._connection._clearResultSet();
this._reset_result();
let stmt = operation;
......@@ -111,18 +111,18 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback
});
obs.observe({ entryTypes: ['measure'] });
performance.mark('A');
res = this._chandle.query(this._connection._conn, stmt);
this._result = this._chandle.query(this._connection._conn, stmt);
performance.mark('B');
performance.measure('query', 'A', 'B');
}
else {
res = this._chandle.query(this._connection._conn, stmt);
this._result = this._chandle.query(this._connection._conn, stmt);
}
res = this._chandle.errno(this._result);
if (res == 0) {
let fieldCount = this._chandle.fieldsCount(this._connection._conn);
let fieldCount = this._chandle.fieldsCount(this._result);
if (fieldCount == 0) {
let affectedRowCount = this._chandle.affectedRows(this._connection._conn);
let affectedRowCount = this._chandle.affectedRows(this._result);
let response = this._createAffectedResponse(affectedRowCount, time)
if (options['quiet'] != true) {
console.log(response);
......@@ -131,16 +131,17 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback
return affectedRowCount; //return num of affected rows, common with insert, use statements
}
else {
let resAndField = this._chandle.useResult(this._connection._conn, fieldCount)
this._result = resAndField.result;
this._fields = resAndField.fields;
this.fields = resAndField.fields;
this._fields = this._chandle.useResult(this._result);
this.fields = this._fields;
console.log('++++++++++++++++++++++++++');
console.log(this._result);
wrapCB(callback);
return this._result; //return a pointer to the result
}
}
else {
throw new errors.ProgrammingError(this._chandle.errStr(this._connection._conn))
throw new errors.ProgrammingError(this._chandle.errStr(this._result))
}
}
......@@ -195,6 +196,8 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
*/
obs.observe({ entryTypes: ['measure'] });
performance.mark('A');
console.log('fetchall ----------------');
while(true) {
let blockAndRows = this._chandle.fetchBlock(this._result, this._fields);
......@@ -221,7 +224,7 @@ TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
let response = this._createSetResponse(this._rowcount, time)
console.log(response);
this._connection._clearResultSet();
// this._connection._clearResultSet();
let fields = this.fields;
this._reset_result();
this.data = data;
......@@ -381,6 +384,9 @@ TDengineCursor.prototype.stopQuery = function stopQuery(result) {
}
TDengineCursor.prototype._reset_result = function _reset_result() {
this._rowcount = -1;
if (this._result != null) {
this._chandle.freeResult(this._result);
}
this._result = null;
this._fields = null;
this.data = [];
......
{
"name": "td-connector",
"version": "1.6.1",
"name": "td2.0-connector",
"version": "0.0.1",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
......
{
"name": "td-connector",
"version": "2.0.0",
"name": "td2.0-connector",
"version": "0.0.1",
"description": "A Node.js connector for TDengine.",
"main": "tdengine.js",
"scripts": {
......
......@@ -19,7 +19,7 @@ function randomBool() {
}
// Initialize
//c1.execute('drop database td_connector_test;');
c1.execute('create database if not exists td_connector_test;');
c1.execute('use td_connector_test;')
c1.execute('create table if not exists all_types (ts timestamp, _int int, _bigint bigint, _float float, _double double, _binary binary(40), _smallint smallint, _tinyint tinyint, _bool bool, _nchar nchar(40));');
......@@ -28,7 +28,7 @@ c1.execute('create table if not exists stabletest (ts timestamp, v1 int, v2 int,
// Shell Test : The following uses the cursor to imitate the taos shell
// Insert
for (let i = 0; i < 10000; i++) {
for (let i = 0; i < 100; i++) {
let insertData = ["now+" + i + "s", // Timestamp
parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // Int
parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // BigInt
......@@ -40,18 +40,20 @@ for (let i = 0; i < 10000; i++) {
randomBool(),
"\"Nchars\""]; // Bool
c1.execute('insert into td_connector_test.all_types values(' + insertData.join(',') + ' );', {quiet:true});
if (i % 1000 == 0) {
if (i % 10 == 0) {
console.log("Insert # " , i);
}
}
// Select
c1.execute('select * from td_connector_test.all_types limit 10 offset 1000;');
console.log('select * from td_connector_test.all_types limit 3 offset 100;');
c1.execute('select * from td_connector_test.all_types limit 1 offset 100;');
var d = c1.fetchall();
console.log(c1.fields);
console.log(d);
/*
// Functions
console.log('select count(*), avg(_int), sum(_float), max(_bigint), min(_double) from td_connector_test.all_types;')
c1.execute('select count(*), avg(_int), sum(_float), max(_bigint), min(_double) from td_connector_test.all_types;');
var d = c1.fetchall();
console.log(c1.fields);
......@@ -134,3 +136,4 @@ setTimeout(function(){
c1.query('drop database td_connector_test;');
},2000);
conn.close();
*/
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册