cinterface.js 26.9 KB
Newer Older
1 2 3 4 5
/**
 * C Interface with TDengine Module
 * @module CTaosInterface
 */

sangshuduo's avatar
sangshuduo 已提交
6
const ref = require('ref-napi');
7
const os = require('os');
sangshuduo's avatar
sangshuduo 已提交
8 9 10
const ffi = require('ffi-napi');
const ArrayType = require('ref-array-napi');
const Struct = require('ref-struct-napi');
11
const FieldTypes = require('./constants');
12
const errors = require('./error');
13
const _ = require('lodash')
S
StoneT2000 已提交
14
const TaosObjects = require('./taosobjects');
15 16

module.exports = CTaosInterface;
17 18 19 20 21 22
const TAOSFIELD = {
  NAME_LENGTH: 65,
  TYPE_OFFSET: 65,
  BYTES_OFFSET: 66,
  STRUCT_SIZE: 68,
}
23
function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
24
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
25 26 27
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
28
    let time = data.readInt64LE(currOffset);
29
    currOffset += nbytes;
30
    res.push(new TaosObjects.TaosTimestamp(time, precision));
31 32 33
  }
  return res;
}
34
function convertBool(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
35
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
36 37 38 39 40
  let res = new Array(data.length);
  for (let i = 0; i < data.length; i++) {
    if (data[i] == 0) {
      res[i] = false;
    }
41
    else if (data[i] == 1) {
42 43
      res[i] = true;
    }
44 45 46
    else if (data[i] == FieldTypes.C_BOOL_NULL) {
      res[i] = null;
    }
47 48 49
  }
  return res;
}
50
function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
51
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
52 53 54
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
55
    let d = data.readIntLE(currOffset, 1);
56
    res.push(d == FieldTypes.C_TINYINT_NULL ? null : d);
57 58 59 60
    currOffset += nbytes;
  }
  return res;
}
61 62 63 64 65 66 67 68 69 70 71 72
function convertTinyintUnsigned(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
    let d = data.readUIntLE(currOffset, 1);
    res.push(d == FieldTypes.C_TINYINT_UNSIGNED_NULL ? null : d);
    currOffset += nbytes;
  }
  return res;
}

73
function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
74
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
75 76 77
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
78
    let d = data.readIntLE(currOffset, 2);
79
    res.push(d == FieldTypes.C_SMALLINT_NULL ? null : d);
80 81 82 83
    currOffset += nbytes;
  }
  return res;
}
84 85 86 87 88 89 90 91 92 93 94 95
function convertSmallintUnsigned(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
    let d = data.readUIntLE(currOffset, 2);
    res.push(d == FieldTypes.C_SMALLINT_UNSIGNED_NULL ? null : d);
    currOffset += nbytes;
  }
  return res;
}

96
function convertInt(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
97
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
98 99 100
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
101 102
    let d = data.readInt32LE(currOffset);
    res.push(d == FieldTypes.C_INT_NULL ? null : d);
103 104 105 106
    currOffset += nbytes;
  }
  return res;
}
107 108 109 110 111 112 113 114 115 116 117 118 119
function convertIntUnsigned(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
    let d = data.readUInt32LE(currOffset);
    res.push(d == FieldTypes.C_INT_UNSIGNED_NULL ? null : d);
    currOffset += nbytes;
  }
  return res;
}


120
function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
121
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
122 123 124
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
125
    let d = data.readInt64LE(currOffset);
L
liu0x54 已提交
126
    res.push(d == FieldTypes.C_BIGINT_NULL ? null : BigInt(d));
127 128 129 130
    currOffset += nbytes;
  }
  return res;
}
131 132 133 134 135 136 137 138 139 140 141 142 143
function convertBigintUnsigned(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
    let d = data.readUInt64LE(currOffset);
    res.push(d == FieldTypes.C_BIGINT_UNSIGNED_NULL ? null : BigInt(d));
    currOffset += nbytes;
  }
  return res;
}


144
function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
145
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
146 147 148
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
149 150
    let d = parseFloat(data.readFloatLE(currOffset).toFixed(5));
    res.push(isNaN(d) ? null : d);
151 152 153 154
    currOffset += nbytes;
  }
  return res;
}
155
function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
156
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
157 158 159
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
160 161
    let d = parseFloat(data.readDoubleLE(currOffset).toFixed(16));
    res.push(isNaN(d) ? null : d);
162 163 164 165
    currOffset += nbytes;
  }
  return res;
}
166

167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
  let res = [];

  let currOffset = 0;
  while (currOffset < data.length) {
    let len = data.readIntLE(currOffset, 2);
    let dataEntry = data.slice(currOffset + 2, currOffset + len + 2); //one entry in a row under a column;
    if (dataEntry[0] == 255) {
      res.push(null)
    } else {
      res.push(dataEntry.toString("utf-8"));
    }
    currOffset += nbytes;
  }
  return res;
}

185
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
186
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
187
  let res = [];
188

189 190
  let currOffset = 0;
  while (currOffset < data.length) {
191 192
    let len = data.readIntLE(currOffset, 2);
    let dataEntry = data.slice(currOffset + 2, currOffset + len + 2); //one entry in a row under a column;
193 194 195 196 197
    if (dataEntry[0] == 255 && dataEntry[1] == 255) {
      res.push(null)
    } else {
      res.push(dataEntry.toString("utf-8"));
    }
198 199 200 201 202
    currOffset += nbytes;
  }
  return res;
}

203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
function convertJsonTag(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
  let res = [];

  let currOffset = 0;
  while (currOffset < data.length) {
    let len = data.readIntLE(currOffset, 2);
    let dataEntry = data.slice(currOffset + 2, currOffset + len + 2); //one entry in a row under a column;
    if (dataEntry[0] == 255 && dataEntry[1] == 255) {
      res.push(null)
    } else {
      res.push(dataEntry.toString("utf-8"));
    }
    currOffset += nbytes;
  }
  return res;
}

221
// Object with all the relevant converters from pblock data to javascript readable data
222
let convertFunctions = {
223 224 225 226 227 228 229
  [FieldTypes.C_BOOL]: convertBool,
  [FieldTypes.C_TINYINT]: convertTinyint,
  [FieldTypes.C_SMALLINT]: convertSmallint,
  [FieldTypes.C_INT]: convertInt,
  [FieldTypes.C_BIGINT]: convertBigint,
  [FieldTypes.C_FLOAT]: convertFloat,
  [FieldTypes.C_DOUBLE]: convertDouble,
230
  [FieldTypes.C_BINARY]: convertBinary,
231
  [FieldTypes.C_TIMESTAMP]: convertTimestamp,
232 233 234 235
  [FieldTypes.C_NCHAR]: convertNchar,
  [FieldTypes.C_TINYINT_UNSIGNED]: convertTinyintUnsigned,
  [FieldTypes.C_SMALLINT_UNSIGNED]: convertSmallintUnsigned,
  [FieldTypes.C_INT_UNSIGNED]: convertIntUnsigned,
236 237
  [FieldTypes.C_BIGINT_UNSIGNED]: convertBigintUnsigned,
  [FieldTypes.C_JSON_TAG]: convertJsonTag,
238 239 240 241 242
}

// Define TaosField structure
var char_arr = ArrayType(ref.types.char);
var TaosField = Struct({
243 244
  'name': char_arr,
});
245
TaosField.fields.name.type.size = 65;
246 247
TaosField.defineProperty('type', ref.types.uint8);
TaosField.defineProperty('bytes', ref.types.int16);
248

249 250
//define schemaless line array
var smlLine = ArrayType(ref.coerceType('char *'))
251

252
/**
S
StoneT2000 已提交
253
 *
254 255
 * @param {Object} config - Configuration options for the interface
 * @return {CTaosInterface}
S
StoneT2000 已提交
256 257 258
 * @class CTaosInterface
 * @classdesc The CTaosInterface is the interface through which Node.JS communicates data back and forth with TDengine. It is not advised to
 * access this class directly and use it unless you understand what these functions do.
259
 */
260
function CTaosInterface(config = null, pass = false) {
261 262
  ref.types.char_ptr = ref.refType(ref.types.char);
  ref.types.void_ptr = ref.refType(ref.types.void);
263
  ref.types.void_ptr2 = ref.refType(ref.types.void_ptr);
264
  /*Declare a bunch of functions first*/
265
  /* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS *  */
266 267 268 269 270 271
  if ('win32' == os.platform()) {
    taoslibname = 'taos';
  } else {
    taoslibname = 'libtaos';
  }
  this.libtaos = ffi.Library(taoslibname, {
272 273
    'taos_options': [ref.types.int, [ref.types.int, ref.types.void_ptr]],
    'taos_init': [ref.types.void, []],
274
    //TAOS *taos_connect(char *ip, char *user, char *pass, char *db, int port)
275
    'taos_connect': [ref.types.void_ptr, [ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int]],
276
    //void taos_close(TAOS *taos)
277 278 279
    'taos_close': [ref.types.void, [ref.types.void_ptr]],
    //int *taos_fetch_lengths(TAOS_RES *res);
    'taos_fetch_lengths': [ref.types.void_ptr, [ref.types.void_ptr]],
280
    //int taos_query(TAOS *taos, char *sqlstr)
281 282 283
    'taos_query': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr]],
    //int taos_affected_rows(TAOS_RES *res)
    'taos_affected_rows': [ref.types.int, [ref.types.void_ptr]],
284
    //int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)
285
    'taos_fetch_block': [ref.types.int, [ref.types.void_ptr, ref.types.void_ptr]],
286
    //int taos_num_fields(TAOS_RES *res);
287
    'taos_num_fields': [ref.types.int, [ref.types.void_ptr]],
288 289
    //TAOS_ROW taos_fetch_row(TAOS_RES *res)
    //TAOS_ROW is void **, but we set the return type as a reference instead to get the row
290 291
    'taos_fetch_row': [ref.refType(ref.types.void_ptr2), [ref.types.void_ptr]],
    'taos_print_row': [ref.types.int, [ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
292
    //int taos_result_precision(TAOS_RES *res)
293
    'taos_result_precision': [ref.types.int, [ref.types.void_ptr]],
294
    //void taos_free_result(TAOS_RES *res)
295
    'taos_free_result': [ref.types.void, [ref.types.void_ptr]],
296
    //int taos_field_count(TAOS *taos)
297
    'taos_field_count': [ref.types.int, [ref.types.void_ptr]],
298
    //TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)
299
    'taos_fetch_fields': [ref.refType(TaosField), [ref.types.void_ptr]],
300
    //int taos_errno(TAOS *taos)
301
    'taos_errno': [ref.types.int, [ref.types.void_ptr]],
302
    //char *taos_errstr(TAOS *taos)
303
    'taos_errstr': [ref.types.char_ptr, [ref.types.void_ptr]],
304
    //void taos_stop_query(TAOS_RES *res);
305
    'taos_stop_query': [ref.types.void, [ref.types.void_ptr]],
306
    //char *taos_get_server_info(TAOS *taos);
307
    'taos_get_server_info': [ref.types.char_ptr, [ref.types.void_ptr]],
308
    //char *taos_get_client_info();
309
    'taos_get_client_info': [ref.types.char_ptr, []],
310 311 312

    // ASYNC
    // void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param)
313
    'taos_query_a': [ref.types.void, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr]],
314
    // void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
315
    'taos_fetch_rows_a': [ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr]],
316 317

    // Subscription
S
StoneT2000 已提交
318
    //TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)
319
    'taos_subscribe': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.int, ref.types.char_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
S
StoneT2000 已提交
320
    // TAOS_RES *taos_consume(TAOS_SUB *tsub)
321
    'taos_consume': [ref.types.void_ptr, [ref.types.void_ptr]],
322
    //void taos_unsubscribe(TAOS_SUB *tsub);
323
    'taos_unsubscribe': [ref.types.void, [ref.types.void_ptr]],
324 325 326 327

    // Continuous Query
    //TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
    //                              int64_t stime, void *param, void (*callback)(void *));
328
    'taos_open_stream': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr]],
329
    //void taos_close_stream(TAOS_STREAM *tstr);
330 331 332 333 334 335
    'taos_close_stream': [ref.types.void, [ref.types.void_ptr]],

    //Schemaless insert 
    //TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol,int precision)
    // 'taos_schemaless_insert': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.int, ref.types.int, ref.types.int]]
    'taos_schemaless_insert': [ref.types.void_ptr, [ref.types.void_ptr, smlLine, 'int', 'int', 'int']]
336

337
  });
338

339 340 341 342 343 344
  if (pass == false) {
    if (config == null) {
      this._config = ref.alloc(ref.types.char_ptr, ref.NULL);
    }
    else {
      try {
345
        this._config = ref.allocCString(config);
346
      }
347
      catch (err) {
348 349 350 351 352 353 354 355
        throw "Attribute Error: config is expected as a str";
      }
    }
    if (config != null) {
      this.libtaos.taos_options(3, this._config);
    }
    this.libtaos.taos_init();
  }
356
  return this;
357 358
}
CTaosInterface.prototype.config = function config() {
359 360 361 362 363
  return this._config;
}
CTaosInterface.prototype.connect = function connect(host = null, user = "root", password = "taosdata", db = null, port = 0) {
  let _host, _user, _password, _db, _port;
  try {
364
    _host = host != null ? ref.allocCString(host) : ref.NULL;
365
  }
366
  catch (err) {
367 368 369 370 371
    throw "Attribute Error: host is expected as a str";
  }
  try {
    _user = ref.allocCString(user)
  }
372
  catch (err) {
373 374 375 376 377
    throw "Attribute Error: user is expected as a str";
  }
  try {
    _password = ref.allocCString(password);
  }
378
  catch (err) {
379 380 381
    throw "Attribute Error: password is expected as a str";
  }
  try {
382
    _db = db != null ? ref.allocCString(db) : ref.NULL;
383
  }
384
  catch (err) {
385 386 387 388 389
    throw "Attribute Error: db is expected as a str";
  }
  try {
    _port = ref.alloc(ref.types.int, port);
  }
390
  catch (err) {
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
    throw TypeError("port is expected as an int")
  }
  let connection = this.libtaos.taos_connect(_host, _user, _password, _db, _port);
  if (ref.isNull(connection)) {
    throw new errors.TDError('Failed to connect to TDengine');
  }
  else {
    console.log('Successfully connected to TDengine');
  }
  return connection;
}
CTaosInterface.prototype.close = function close(connection) {
  this.libtaos.taos_close(connection);
  console.log("Connection is closed");
}
CTaosInterface.prototype.query = function query(connection, sql) {
407
  return this.libtaos.taos_query(connection, ref.allocCString(sql));
408
}
409

410 411
CTaosInterface.prototype.affectedRows = function affectedRows(result) {
  return this.libtaos.taos_affected_rows(result);
412
}
413 414
CTaosInterface.prototype.useResult = function useResult(result) {

415 416
  let fields = [];
  let pfields = this.fetchFields(result);
417

418
  if (ref.isNull(pfields) == false) {
419 420
    pfields = ref.reinterpret(pfields, this.fieldsCount(result) * TAOSFIELD.STRUCT_SIZE, 0);
    for (let i = 0; i < pfields.length; i += TAOSFIELD.STRUCT_SIZE) {
421
      fields.push({
422 423 424
        name: ref.readCString(ref.reinterpret(pfields, TAOSFIELD.NAME_LENGTH, i)),
        type: pfields[i + TAOSFIELD.TYPE_OFFSET],
        bytes: pfields[i + TAOSFIELD.BYTES_OFFSET] + pfields[i + TAOSFIELD.BYTES_OFFSET + 1] * 256
425 426 427
      })
    }
  }
L
liu0x54 已提交
428
  return fields;
429 430
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
431 432 433 434
  let pblock = ref.NULL_POINTER;
  let num_of_rows = this.libtaos.taos_fetch_block(result, pblock);
  if (ref.isNull(pblock.deref()) == true) {
    return { block: null, num_of_rows: 0 };
435
  }
436

437
  var fieldL = this.libtaos.taos_fetch_lengths(result);
438
  let precision = this.libtaos.taos_result_precision(result);
L
liu0x54 已提交
439

440
  var fieldlens = [];
441

442
  if (ref.isNull(fieldL) == false) {
443 444
    for (let i = 0; i < fields.length; i++) {
      let plen = ref.reinterpret(fieldL, 4, i * 4);
445
      let len = plen.readInt32LE(0);
446
      fieldlens.push(len);
447 448
    }
  }
L
liu0x54 已提交
449

L
liu0x54 已提交
450
  let blocks = new Array(fields.length);
L
liu0x54 已提交
451
  blocks.fill(null);
452
  num_of_rows = Math.abs(num_of_rows);
L
liu0x54 已提交
453
  let offset = 0;
454 455
  let ptr = pblock.deref();

L
liu0x54 已提交
456
  for (let i = 0; i < fields.length; i++) {
457 458 459 460 461 462 463 464
    pdata = ref.reinterpret(ptr, 8, i * 8);
    if (ref.isNull(pdata.readPointer())) {
      blocks[i] = new Array();
    } else {
      pdata = ref.ref(pdata.readPointer());
      if (!convertFunctions[fields[i]['type']]) {
        throw new errors.DatabaseError("Invalid data type returned from database");
      }
465
      blocks[i] = convertFunctions[fields[i]['type']](pdata, num_of_rows, fieldlens[i], offset, precision);
466 467 468
    }
  }
  return { blocks: blocks, num_of_rows }
469
}
470 471 472 473
CTaosInterface.prototype.fetchRow = function fetchRow(result, fields) {
  let row = this.libtaos.taos_fetch_row(result);
  return row;
}
474 475 476 477
CTaosInterface.prototype.freeResult = function freeResult(result) {
  this.libtaos.taos_free_result(result);
  result = null;
}
478 479 480 481
/** Number of fields returned in this result handle, must use with async */
CTaosInterface.prototype.numFields = function numFields(result) {
  return this.libtaos.taos_num_fields(result);
}
482
// Fetch fields count by connection, the latest query
483 484
CTaosInterface.prototype.fieldsCount = function fieldsCount(result) {
  return this.libtaos.taos_field_count(result);
485 486 487 488
}
CTaosInterface.prototype.fetchFields = function fetchFields(result) {
  return this.libtaos.taos_fetch_fields(result);
}
489 490
CTaosInterface.prototype.errno = function errno(result) {
  return this.libtaos.taos_errno(result);
491
}
492 493
CTaosInterface.prototype.errStr = function errStr(result) {
  return ref.readCString(this.libtaos.taos_errstr(result));
494 495 496 497
}
// Async
CTaosInterface.prototype.query_a = function query_a(connection, sql, callback, param = ref.ref(ref.NULL)) {
  // void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, int), void *param)
498
  callback = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], callback);
499 500 501
  this.libtaos.taos_query_a(connection, ref.allocCString(sql), callback, param);
  return param;
}
502

503 504 505 506 507 508 509 510 511 512 513 514
/** Asynchrnously fetches the next block of rows. Wraps callback and transfers a 4th argument to the cursor, the row data as blocks in javascript form
 * Note: This isn't a recursive function, in order to fetch all data either use the TDengine cursor object, TaosQuery object, or implement a recrusive
 * function yourself using the libtaos.taos_fetch_rows_a function
 */
CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback, param = ref.ref(ref.NULL)) {
  // void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
  var cti = this;
  // wrap callback with a function so interface can access the numOfRows value, needed in order to properly process the binary data
  let asyncCallbackWrapper = function (param2, result2, numOfRows2) {
    // Data preparation to pass to cursor. Could be bottleneck in query execution callback times.
    let row = cti.libtaos.taos_fetch_row(result2);
    let fields = cti.fetchFields_a(result2);
515

516
    let precision = cti.libtaos.taos_result_precision(result2);
517 518 519 520
    let blocks = new Array(fields.length);
    blocks.fill(null);
    numOfRows2 = Math.abs(numOfRows2);
    let offset = 0;
521 522 523
    var fieldL = cti.libtaos.taos_fetch_lengths(result);
    var fieldlens = [];
    if (ref.isNull(fieldL) == false) {
524 525 526 527

      for (let i = 0; i < fields.length; i++) {
        let plen = ref.reinterpret(fieldL, 8, i * 8);
        let len = ref.get(plen, 0, ref.types.int32);
528 529 530
        fieldlens.push(len);
      }
    }
531
    if (numOfRows2 > 0) {
532
      for (let i = 0; i < fields.length; i++) {
533 534 535 536 537 538 539 540 541
        if (ref.isNull(pdata.readPointer())) {
          blocks[i] = new Array();
        } else {
          if (!convertFunctions[fields[i]['type']]) {
            throw new errors.DatabaseError("Invalid data type returned from database");
          }
          let prow = ref.reinterpret(row, 8, i * 8);
          prow = prow.readPointer();
          prow = ref.ref(prow);
542
          blocks[i] = convertFunctions[fields[i]['type']](prow, 1, fieldlens[i], offset, precision);
543 544
          //offset += fields[i]['bytes'] * numOfRows2;
        }
545 546 547 548
      }
    }
    callback(param2, result2, numOfRows2, blocks);
  }
549
  asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], asyncCallbackWrapper);
550 551 552 553
  this.libtaos.taos_fetch_rows_a(result, asyncCallbackWrapper, param);
  return param;
}
// Fetch field meta data by result handle
554
CTaosInterface.prototype.fetchFields_a = function fetchFields_a(result) {
555 556 557 558
  let pfields = this.fetchFields(result);
  let pfieldscount = this.numFields(result);
  let fields = [];
  if (ref.isNull(pfields) == false) {
559 560
    pfields = ref.reinterpret(pfields, pfieldscount * TAOSFIELD.STRUCT_SIZE, 0);
    for (let i = 0; i < pfields.length; i += TAOSFIELD.STRUCT_SIZE) {
561
      fields.push({
562 563 564
        name: ref.readCString(ref.reinterpret(pfields, TAOSFIELD.NAME_LENGTH, i)),
        type: pfields[i + TAOSFIELD.TYPE_OFFSET],
        bytes: pfields[i + TAOSFIELD.BYTES_OFFSET] + pfields[i + TAOSFIELD.BYTES_OFFSET + 1] * 256
565 566 567 568
      })
    }
  }
  return fields;
569
}
570 571
// Stop a query by result handle
CTaosInterface.prototype.stopQuery = function stopQuery(result) {
572
  if (result != null) {
573 574 575 576 577 578 579 580 581 582 583 584 585 586
    this.libtaos.taos_stop_query(result);
  }
  else {
    throw new errors.ProgrammingError("No result handle passed to stop query");
  }
}
CTaosInterface.prototype.getServerInfo = function getServerInfo(connection) {
  return ref.readCString(this.libtaos.taos_get_server_info(connection));
}
CTaosInterface.prototype.getClientInfo = function getClientInfo() {
  return ref.readCString(this.libtaos.taos_get_client_info());
}

// Subscription
S
StoneT2000 已提交
587 588 589
CTaosInterface.prototype.subscribe = function subscribe(connection, restart, topic, sql, interval) {
  let topicOrig = topic;
  let sqlOrig = sql;
590
  try {
S
StoneT2000 已提交
591
    sql = sql != null ? ref.allocCString(sql) : ref.alloc(ref.types.char_ptr, ref.NULL);
592
  }
593
  catch (err) {
S
StoneT2000 已提交
594
    throw "Attribute Error: sql is expected as a str";
595 596
  }
  try {
S
StoneT2000 已提交
597
    topic = topic != null ? ref.allocCString(topic) : ref.alloc(ref.types.char_ptr, ref.NULL);
598
  }
599
  catch (err) {
S
StoneT2000 已提交
600 601
    throw TypeError("topic is expected as a str");
  }
S
StoneT2000 已提交
602

S
StoneT2000 已提交
603
  restart = ref.alloc(ref.types.int, restart);
S
StoneT2000 已提交
604

S
StoneT2000 已提交
605
  let subscription = this.libtaos.taos_subscribe(connection, restart, topic, sql, null, null, interval);
606 607 608 609
  if (ref.isNull(subscription)) {
    throw new errors.TDError('Failed to subscribe to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig);
  }
  else {
S
StoneT2000 已提交
610
    console.log('Successfully subscribed to TDengine - Topic: ' + topicOrig);
611 612 613
  }
  return subscription;
}
S
StoneT2000 已提交
614 615 616

CTaosInterface.prototype.consume = function consume(subscription) {
  let result = this.libtaos.taos_consume(subscription);
617
  let fields = [];
S
StoneT2000 已提交
618
  let pfields = this.fetchFields(result);
619
  if (ref.isNull(pfields) == false) {
620 621
    pfields = ref.reinterpret(pfields, this.numFields(result) * TAOSFIELD.STRUCT_SIZE, 0);
    for (let i = 0; i < pfields.length; i += TAOSFIELD.STRUCT_SIZE) {
622
      fields.push({
623 624 625
        name: ref.readCString(ref.reinterpret(pfields, TAOSFIELD.NAME_LENGTH, i)),
        bytes: pfields[TAOSFIELD.TYPE_OFFSET],
        type: pfields[i + TAOSFIELD.BYTES_OFFSET] + pfields[i + TAOSFIELD.BYTES_OFFSET + 1] * 256
626 627 628
      })
    }
  }
S
StoneT2000 已提交
629 630

  let data = [];
631
  while (true) {
S
StoneT2000 已提交
632 633 634 635 636 637 638 639 640
    let { blocks, num_of_rows } = this.fetchBlock(result, fields);
    if (num_of_rows == 0) {
      break;
    }
    for (let i = 0; i < num_of_rows; i++) {
      data.push([]);
      let rowBlock = new Array(fields.length);
      for (let j = 0; j < fields.length; j++) {
        rowBlock[j] = blocks[j][i];
641
      }
642
      data[data.length - 1] = (rowBlock);
643 644
    }
  }
S
StoneT2000 已提交
645
  return { data: data, fields: fields, result: result };
646 647 648 649 650 651 652
}
CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) {
  //void taos_unsubscribe(TAOS_SUB *tsub);
  this.libtaos.taos_unsubscribe(subscription);
}

// Continuous Query
653
CTaosInterface.prototype.openStream = function openStream(connection, sql, callback, stime, stoppingCallback, param = ref.ref(ref.NULL)) {
654 655 656
  try {
    sql = ref.allocCString(sql);
  }
657
  catch (err) {
658 659 660 661 662
    throw "Attribute Error: sql string is expected as a str";
  }
  var cti = this;
  let asyncCallbackWrapper = function (param2, result2, row) {
    let fields = cti.fetchFields_a(result2);
663
    let precision = cti.libtaos.taos_result_precision(result2);
664 665 666 667 668 669
    let blocks = new Array(fields.length);
    blocks.fill(null);
    let numOfRows2 = 1;
    let offset = 0;
    if (numOfRows2 > 0) {
      for (let i = 0; i < fields.length; i++) {
670
        if (!convertFunctions[fields[i]['type']]) {
671 672
          throw new errors.DatabaseError("Invalid data type returned from database");
        }
673
        blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, precision);
674 675 676 677 678
        offset += fields[i]['bytes'] * numOfRows2;
      }
    }
    callback(param2, result2, blocks, fields);
  }
679 680
  asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.refType(ref.types.void_ptr2)], asyncCallbackWrapper);
  asyncStoppingCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr], stoppingCallback);
681 682 683 684 685 686 687 688 689 690 691 692 693 694
  let streamHandle = this.libtaos.taos_open_stream(connection, sql, asyncCallbackWrapper, stime, param, asyncStoppingCallbackWrapper);
  if (ref.isNull(streamHandle)) {
    throw new errors.TDError('Failed to open a stream with TDengine');
    return false;
  }
  else {
    console.log("Succesfully opened stream");
    return streamHandle;
  }
}
CTaosInterface.prototype.closeStream = function closeStream(stream) {
  this.libtaos.taos_close_stream(stream);
  console.log("Closed stream");
}
695 696 697 698 699 700 701 702 703 704 705 706 707 708
//Schemaless insert API 
/**
 * TAOS* taos, char* lines[], int numLines, int protocol,int precision)
 * using  taos_errstr get error info, taos_errno get error code. Remmember 
 * to release taos_res, otherwile will lead memory leak.
 * TAOS schemaless insert api
 * @param {*} connection a valid database connection
 * @param {*} lines string data, which statisfied with line proctocol
 * @param {*} numLines number of rows in param lines.
 * @param {*} protocal Line protocol, enum type (0,1,2,3),indicate different line protocol
 * @param {*} precision timestamp precision in lines, enum type (0,1,2,3,4,5,6)
 * @returns TAOS_RES 
 * 
 */
709
CTaosInterface.prototype.schemalessInsert = function schemalessInsert(connection, lines, protocal, precision) {
710 711
  let _numLines = null;
  let _lines = null;
712 713

  if (_.isString(lines)) {
714 715
    _numLines = 1;
    _lines = Buffer.alloc(_numLines * ref.sizeof.pointer);
716
    ref.set(_lines, 0, ref.allocCString(lines), ref.types.char_ptr);
717
  }
718
  else if (_.isArray(lines)) {
719 720
    _numLines = lines.length;
    _lines = Buffer.alloc(_numLines * ref.sizeof.pointer);
721 722
    for (let i = 0; i < _numLines; i++) {
      ref.set(_lines, i * ref.sizeof.pointer, ref.allocCString(lines[i]), ref.types.char_ptr)
723 724
    }
  }
725
  else {
726 727 728 729
    throw new errors.InterfaceError("Unsupport lines input")
  }
  return this.libtaos.taos_schemaless_insert(connection, _lines, _numLines, protocal, precision);
}