cinterface.js 22.7 KB
Newer Older
1 2 3 4 5
/**
 * C Interface with TDengine Module
 * @module CTaosInterface
 */

sangshuduo's avatar
sangshuduo 已提交
6
const ref = require('ref-napi');
7
const os = require('os');
sangshuduo's avatar
sangshuduo 已提交
8 9 10
const ffi = require('ffi-napi');
const ArrayType = require('ref-array-napi');
const Struct = require('ref-struct-napi');
11
const FieldTypes = require('./constants');
12
const errors = require('./error');
S
StoneT2000 已提交
13
const TaosObjects = require('./taosobjects');
sangshuduo's avatar
sangshuduo 已提交
14
const { NULL_POINTER } = require('ref-napi');
15 16 17 18

module.exports = CTaosInterface;

function convertMillisecondsToDatetime(time) {
S
StoneT2000 已提交
19
  return new TaosObjects.TaosTimestamp(time);
20 21
}
function convertMicrosecondsToDatetime(time) {
22
  return new TaosObjects.TaosTimestamp(time * 0.001, true);
23 24
}

25
function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
26 27 28 29
  timestampConverter = convertMillisecondsToDatetime;
  if (micro == true) {
    timestampConverter = convertMicrosecondsToDatetime;
  }
L
liu0x54 已提交
30
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
    let queue = [];
    let time = 0;
    for (let i = currOffset; i < currOffset + nbytes; i++) {
      queue.push(data[i]);
    }
    for (let i = queue.length - 1; i >= 0; i--) {
      time += queue[i] * Math.pow(16, i * 2);
    }
    currOffset += nbytes;
    res.push(timestampConverter(time));
  }
  return res;
}
47
function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
48
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
49 50 51 52 53
  let res = new Array(data.length);
  for (let i = 0; i < data.length; i++) {
    if (data[i] == 0) {
      res[i] = false;
    }
54
    else if (data[i] == 1) {
55 56
      res[i] = true;
    }
57 58 59
    else if (data[i] == FieldTypes.C_BOOL_NULL) {
      res[i] = null;
    }
60 61 62
  }
  return res;
}
63
function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
64
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
65 66 67
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
68
    let d = data.readIntLE(currOffset, 1);
69
    res.push(d == FieldTypes.C_TINYINT_NULL ? null : d);
70 71 72 73
    currOffset += nbytes;
  }
  return res;
}
74
function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
75
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
76 77 78
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
79
    let d = data.readIntLE(currOffset, 2);
80
    res.push(d == FieldTypes.C_SMALLINT_NULL ? null : d);
81 82 83 84
    currOffset += nbytes;
  }
  return res;
}
85
function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
86
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
87 88 89
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
90 91
    let d = data.readInt32LE(currOffset);
    res.push(d == FieldTypes.C_INT_NULL ? null : d);
92 93 94 95
    currOffset += nbytes;
  }
  return res;
}
96
function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
97
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
98 99 100
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
101
    let d = data.readInt64LE(currOffset);
L
liu0x54 已提交
102
    res.push(d == FieldTypes.C_BIGINT_NULL ? null : BigInt(d));
103 104 105 106
    currOffset += nbytes;
  }
  return res;
}
107
function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
108
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
109 110 111
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
112 113
    let d = parseFloat(data.readFloatLE(currOffset).toFixed(5));
    res.push(isNaN(d) ? null : d);
114 115 116 117
    currOffset += nbytes;
  }
  return res;
}
118
function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
119
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
120 121 122
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
123 124
    let d = parseFloat(data.readDoubleLE(currOffset).toFixed(16));
    res.push(isNaN(d) ? null : d);
125 126 127 128
    currOffset += nbytes;
  }
  return res;
}
129
function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
130
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
131 132 133
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
134 135 136 137 138 139 140
    let dataEntry = data.slice(currOffset, currOffset + nbytes);
    if (dataEntry[0] == FieldTypes.C_BINARY_NULL) {
      res.push(null);
    }
    else {
      res.push(ref.readCString(dataEntry));
    }
141 142 143 144
    currOffset += nbytes;
  }
  return res;
}
145
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
146
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
147
  let res = [];
148 149 150
  let dataEntry = data.slice(0, nbytes); //one entry in a row under a column;
  //TODO: should use the correct character encoding 
  res.push(dataEntry.toString("utf-8"));
151 152 153
  return res;
}

154
// Object with all the relevant converters from pblock data to javascript readable data
155
let convertFunctions = {
156 157 158 159 160 161 162 163 164 165
  [FieldTypes.C_BOOL]: convertBool,
  [FieldTypes.C_TINYINT]: convertTinyint,
  [FieldTypes.C_SMALLINT]: convertSmallint,
  [FieldTypes.C_INT]: convertInt,
  [FieldTypes.C_BIGINT]: convertBigint,
  [FieldTypes.C_FLOAT]: convertFloat,
  [FieldTypes.C_DOUBLE]: convertDouble,
  [FieldTypes.C_BINARY]: convertBinary,
  [FieldTypes.C_TIMESTAMP]: convertTimestamp,
  [FieldTypes.C_NCHAR]: convertNchar
166 167 168 169 170
}

// Define TaosField structure
var char_arr = ArrayType(ref.types.char);
var TaosField = Struct({
171 172
  'name': char_arr,
});
173
TaosField.fields.name.type.size = 65;
174
TaosField.defineProperty('type', ref.types.char);
175 176
TaosField.defineProperty('bytes', ref.types.short);

177

178
/**
S
StoneT2000 已提交
179
 *
180 181
 * @param {Object} config - Configuration options for the interface
 * @return {CTaosInterface}
S
StoneT2000 已提交
182 183 184
 * @class CTaosInterface
 * @classdesc The CTaosInterface is the interface through which Node.JS communicates data back and forth with TDengine. It is not advised to
 * access this class directly and use it unless you understand what these functions do.
185
 */
186
function CTaosInterface(config = null, pass = false) {
187 188
  ref.types.char_ptr = ref.refType(ref.types.char);
  ref.types.void_ptr = ref.refType(ref.types.void);
189
  ref.types.void_ptr2 = ref.refType(ref.types.void_ptr);
190
  /*Declare a bunch of functions first*/
191
  /* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS *  */
192 193 194 195 196 197 198

  if ('win32' == os.platform()) {
    taoslibname = 'taos';
  } else {
    taoslibname = 'libtaos';
  }
  this.libtaos = ffi.Library(taoslibname, {
199 200
    'taos_options': [ref.types.int, [ref.types.int, ref.types.void_ptr]],
    'taos_init': [ref.types.void, []],
201
    //TAOS *taos_connect(char *ip, char *user, char *pass, char *db, int port)
202
    'taos_connect': [ref.types.void_ptr, [ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int]],
203
    //void taos_close(TAOS *taos)
204 205 206
    'taos_close': [ref.types.void, [ref.types.void_ptr]],
    //int *taos_fetch_lengths(TAOS_RES *res);
    'taos_fetch_lengths': [ref.types.void_ptr, [ref.types.void_ptr]],
207
    //int taos_query(TAOS *taos, char *sqlstr)
208 209 210
    'taos_query': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr]],
    //int taos_affected_rows(TAOS_RES *res)
    'taos_affected_rows': [ref.types.int, [ref.types.void_ptr]],
211
    //int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)
212
    'taos_fetch_block': [ref.types.int, [ref.types.void_ptr, ref.types.void_ptr]],
213
    //int taos_num_fields(TAOS_RES *res);
214
    'taos_num_fields': [ref.types.int, [ref.types.void_ptr]],
215 216
    //TAOS_ROW taos_fetch_row(TAOS_RES *res)
    //TAOS_ROW is void **, but we set the return type as a reference instead to get the row
217 218
    'taos_fetch_row': [ref.refType(ref.types.void_ptr2), [ref.types.void_ptr]],
    'taos_print_row': [ref.types.int, [ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
219
    //int taos_result_precision(TAOS_RES *res)
220
    'taos_result_precision': [ref.types.int, [ref.types.void_ptr]],
221
    //void taos_free_result(TAOS_RES *res)
222
    'taos_free_result': [ref.types.void, [ref.types.void_ptr]],
223
    //int taos_field_count(TAOS *taos)
224
    'taos_field_count': [ref.types.int, [ref.types.void_ptr]],
225
    //TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)
226
    'taos_fetch_fields': [ref.refType(TaosField), [ref.types.void_ptr]],
227
    //int taos_errno(TAOS *taos)
228
    'taos_errno': [ref.types.int, [ref.types.void_ptr]],
229
    //char *taos_errstr(TAOS *taos)
230
    'taos_errstr': [ref.types.char_ptr, [ref.types.void_ptr]],
231
    //void taos_stop_query(TAOS_RES *res);
232
    'taos_stop_query': [ref.types.void, [ref.types.void_ptr]],
233
    //char *taos_get_server_info(TAOS *taos);
234
    'taos_get_server_info': [ref.types.char_ptr, [ref.types.void_ptr]],
235
    //char *taos_get_client_info();
236
    'taos_get_client_info': [ref.types.char_ptr, []],
237 238 239

    // ASYNC
    // void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param)
240
    'taos_query_a': [ref.types.void, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr]],
241
    // void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
242
    'taos_fetch_rows_a': [ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr]],
243 244

    // Subscription
S
StoneT2000 已提交
245
    //TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)
246
    'taos_subscribe': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.int, ref.types.char_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
S
StoneT2000 已提交
247
    // TAOS_RES *taos_consume(TAOS_SUB *tsub)
248
    'taos_consume': [ref.types.void_ptr, [ref.types.void_ptr]],
249
    //void taos_unsubscribe(TAOS_SUB *tsub);
250
    'taos_unsubscribe': [ref.types.void, [ref.types.void_ptr]],
251 252 253 254

    // Continuous Query
    //TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
    //                              int64_t stime, void *param, void (*callback)(void *));
255
    'taos_open_stream': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr]],
256
    //void taos_close_stream(TAOS_STREAM *tstr);
257
    'taos_close_stream': [ref.types.void, [ref.types.void_ptr]]
258

259 260 261 262 263 264 265
  });
  if (pass == false) {
    if (config == null) {
      this._config = ref.alloc(ref.types.char_ptr, ref.NULL);
    }
    else {
      try {
266
        this._config = ref.allocCString(config);
267
      }
268
      catch (err) {
269 270 271 272 273 274 275 276
        throw "Attribute Error: config is expected as a str";
      }
    }
    if (config != null) {
      this.libtaos.taos_options(3, this._config);
    }
    this.libtaos.taos_init();
  }
277
  return this;
278 279
}
CTaosInterface.prototype.config = function config() {
280 281 282 283 284
  return this._config;
}
CTaosInterface.prototype.connect = function connect(host = null, user = "root", password = "taosdata", db = null, port = 0) {
  let _host, _user, _password, _db, _port;
  try {
285 286
    _host = host != null ? ref.allocCString(host) : ref.alloc(ref.types.char_ptr, ref.NULL);
  }
287
  catch (err) {
288 289 290 291 292
    throw "Attribute Error: host is expected as a str";
  }
  try {
    _user = ref.allocCString(user)
  }
293
  catch (err) {
294 295 296 297 298
    throw "Attribute Error: user is expected as a str";
  }
  try {
    _password = ref.allocCString(password);
  }
299
  catch (err) {
300 301 302 303 304
    throw "Attribute Error: password is expected as a str";
  }
  try {
    _db = db != null ? ref.allocCString(db) : ref.alloc(ref.types.char_ptr, ref.NULL);
  }
305
  catch (err) {
306 307 308 309 310
    throw "Attribute Error: db is expected as a str";
  }
  try {
    _port = ref.alloc(ref.types.int, port);
  }
311
  catch (err) {
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
    throw TypeError("port is expected as an int")
  }
  let connection = this.libtaos.taos_connect(_host, _user, _password, _db, _port);
  if (ref.isNull(connection)) {
    throw new errors.TDError('Failed to connect to TDengine');
  }
  else {
    console.log('Successfully connected to TDengine');
  }
  return connection;
}
CTaosInterface.prototype.close = function close(connection) {
  this.libtaos.taos_close(connection);
  console.log("Connection is closed");
}
CTaosInterface.prototype.query = function query(connection, sql) {
328
  return this.libtaos.taos_query(connection, ref.allocCString(sql));
329
}
330 331
CTaosInterface.prototype.affectedRows = function affectedRows(result) {
  return this.libtaos.taos_affected_rows(result);
332
}
333 334
CTaosInterface.prototype.useResult = function useResult(result) {

335 336 337
  let fields = [];
  let pfields = this.fetchFields(result);
  if (ref.isNull(pfields) == false) {
338
    pfields = ref.reinterpret(pfields, this.fieldsCount(result) * 68, 0);
339
    for (let i = 0; i < pfields.length; i += 68) {
340
      //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
341 342
      fields.push({
        name: ref.readCString(ref.reinterpret(pfields, 65, i)),
343 344
        type: pfields[i + 65],
        bytes: pfields[i + 66]
345 346 347
      })
    }
  }
L
liu0x54 已提交
348
  return fields;
349 350
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
351 352 353 354
  let pblock = ref.NULL_POINTER;
  let num_of_rows = this.libtaos.taos_fetch_block(result, pblock);
  if (ref.isNull(pblock.deref()) == true) {
    return { block: null, num_of_rows: 0 };
355
  }
356

357
  var fieldL = this.libtaos.taos_fetch_lengths(result);
358

L
liu0x54 已提交
359 360
  let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);

361
  var fieldlens = [];
362

363
  if (ref.isNull(fieldL) == false) {
364 365
    for (let i = 0; i < fields.length; i++) {
      let plen = ref.reinterpret(fieldL, 4, i * 4);
366
      let len = plen.readInt32LE(0);
367
      fieldlens.push(len);
368 369
    }
  }
L
liu0x54 已提交
370

L
liu0x54 已提交
371
  let blocks = new Array(fields.length);
L
liu0x54 已提交
372
  blocks.fill(null);
373
  num_of_rows = Math.abs(num_of_rows);
L
liu0x54 已提交
374
  let offset = 0;
375 376
  let ptr = pblock.deref();

L
liu0x54 已提交
377
  for (let i = 0; i < fields.length; i++) {
378 379 380 381 382 383 384 385 386 387 388 389
    pdata = ref.reinterpret(ptr, 8, i * 8);
    if (ref.isNull(pdata.readPointer())) {
      blocks[i] = new Array();
    } else {
      pdata = ref.ref(pdata.readPointer());
      if (!convertFunctions[fields[i]['type']]) {
        throw new errors.DatabaseError("Invalid data type returned from database");
      }
      blocks[i] = convertFunctions[fields[i]['type']](pdata, num_of_rows, fieldlens[i], offset, isMicro);
    }
  }
  return { blocks: blocks, num_of_rows }
390
}
391 392 393 394
CTaosInterface.prototype.fetchRow = function fetchRow(result, fields) {
  let row = this.libtaos.taos_fetch_row(result);
  return row;
}
395 396 397 398
CTaosInterface.prototype.freeResult = function freeResult(result) {
  this.libtaos.taos_free_result(result);
  result = null;
}
399 400 401 402
/** Number of fields returned in this result handle, must use with async */
CTaosInterface.prototype.numFields = function numFields(result) {
  return this.libtaos.taos_num_fields(result);
}
403
// Fetch fields count by connection, the latest query
404 405
CTaosInterface.prototype.fieldsCount = function fieldsCount(result) {
  return this.libtaos.taos_field_count(result);
406 407 408 409
}
CTaosInterface.prototype.fetchFields = function fetchFields(result) {
  return this.libtaos.taos_fetch_fields(result);
}
410 411
CTaosInterface.prototype.errno = function errno(result) {
  return this.libtaos.taos_errno(result);
412
}
413 414
CTaosInterface.prototype.errStr = function errStr(result) {
  return ref.readCString(this.libtaos.taos_errstr(result));
415 416 417 418
}
// Async
CTaosInterface.prototype.query_a = function query_a(connection, sql, callback, param = ref.ref(ref.NULL)) {
  // void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, int), void *param)
419
  callback = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], callback);
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
  this.libtaos.taos_query_a(connection, ref.allocCString(sql), callback, param);
  return param;
}
/** Asynchrnously fetches the next block of rows. Wraps callback and transfers a 4th argument to the cursor, the row data as blocks in javascript form
 * Note: This isn't a recursive function, in order to fetch all data either use the TDengine cursor object, TaosQuery object, or implement a recrusive
 * function yourself using the libtaos.taos_fetch_rows_a function
 */
CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback, param = ref.ref(ref.NULL)) {
  // void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
  var cti = this;
  // wrap callback with a function so interface can access the numOfRows value, needed in order to properly process the binary data
  let asyncCallbackWrapper = function (param2, result2, numOfRows2) {
    // Data preparation to pass to cursor. Could be bottleneck in query execution callback times.
    let row = cti.libtaos.taos_fetch_row(result2);
    let fields = cti.fetchFields_a(result2);
435

436
    let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
437 438 439 440
    let blocks = new Array(fields.length);
    blocks.fill(null);
    numOfRows2 = Math.abs(numOfRows2);
    let offset = 0;
441 442 443
    var fieldL = cti.libtaos.taos_fetch_lengths(result);
    var fieldlens = [];
    if (ref.isNull(fieldL) == false) {
444 445 446 447

      for (let i = 0; i < fields.length; i++) {
        let plen = ref.reinterpret(fieldL, 8, i * 8);
        let len = ref.get(plen, 0, ref.types.int32);
448 449 450
        fieldlens.push(len);
      }
    }
451
    if (numOfRows2 > 0) {
452
      for (let i = 0; i < fields.length; i++) {
453 454 455 456 457 458 459 460 461 462 463 464
        if (ref.isNull(pdata.readPointer())) {
          blocks[i] = new Array();
        } else {
          if (!convertFunctions[fields[i]['type']]) {
            throw new errors.DatabaseError("Invalid data type returned from database");
          }
          let prow = ref.reinterpret(row, 8, i * 8);
          prow = prow.readPointer();
          prow = ref.ref(prow);
          blocks[i] = convertFunctions[fields[i]['type']](prow, 1, fieldlens[i], offset, isMicro);
          //offset += fields[i]['bytes'] * numOfRows2;
        }
465 466 467 468
      }
    }
    callback(param2, result2, numOfRows2, blocks);
  }
469
  asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], asyncCallbackWrapper);
470 471 472 473
  this.libtaos.taos_fetch_rows_a(result, asyncCallbackWrapper, param);
  return param;
}
// Fetch field meta data by result handle
474
CTaosInterface.prototype.fetchFields_a = function fetchFields_a(result) {
475 476 477 478
  let pfields = this.fetchFields(result);
  let pfieldscount = this.numFields(result);
  let fields = [];
  if (ref.isNull(pfields) == false) {
479
    pfields = ref.reinterpret(pfields, 68 * pfieldscount, 0);
480
    for (let i = 0; i < pfields.length; i += 68) {
481
      //0 - 64 = name //65 = type, 66 - 67 = bytes
482 483
      fields.push({
        name: ref.readCString(ref.reinterpret(pfields, 65, i)),
484 485
        type: pfields[i + 65],
        bytes: pfields[i + 66]
486 487 488 489
      })
    }
  }
  return fields;
490
}
491 492
// Stop a query by result handle
CTaosInterface.prototype.stopQuery = function stopQuery(result) {
493
  if (result != null) {
494 495 496 497 498 499 500 501 502 503 504 505 506 507
    this.libtaos.taos_stop_query(result);
  }
  else {
    throw new errors.ProgrammingError("No result handle passed to stop query");
  }
}
CTaosInterface.prototype.getServerInfo = function getServerInfo(connection) {
  return ref.readCString(this.libtaos.taos_get_server_info(connection));
}
CTaosInterface.prototype.getClientInfo = function getClientInfo() {
  return ref.readCString(this.libtaos.taos_get_client_info());
}

// Subscription
S
StoneT2000 已提交
508 509 510
CTaosInterface.prototype.subscribe = function subscribe(connection, restart, topic, sql, interval) {
  let topicOrig = topic;
  let sqlOrig = sql;
511
  try {
S
StoneT2000 已提交
512
    sql = sql != null ? ref.allocCString(sql) : ref.alloc(ref.types.char_ptr, ref.NULL);
513
  }
514
  catch (err) {
S
StoneT2000 已提交
515
    throw "Attribute Error: sql is expected as a str";
516 517
  }
  try {
S
StoneT2000 已提交
518
    topic = topic != null ? ref.allocCString(topic) : ref.alloc(ref.types.char_ptr, ref.NULL);
519
  }
520
  catch (err) {
S
StoneT2000 已提交
521 522
    throw TypeError("topic is expected as a str");
  }
S
StoneT2000 已提交
523

S
StoneT2000 已提交
524
  restart = ref.alloc(ref.types.int, restart);
S
StoneT2000 已提交
525

S
StoneT2000 已提交
526
  let subscription = this.libtaos.taos_subscribe(connection, restart, topic, sql, null, null, interval);
527 528 529 530
  if (ref.isNull(subscription)) {
    throw new errors.TDError('Failed to subscribe to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig);
  }
  else {
S
StoneT2000 已提交
531
    console.log('Successfully subscribed to TDengine - Topic: ' + topicOrig);
532 533 534
  }
  return subscription;
}
S
StoneT2000 已提交
535 536 537

CTaosInterface.prototype.consume = function consume(subscription) {
  let result = this.libtaos.taos_consume(subscription);
538
  let fields = [];
S
StoneT2000 已提交
539
  let pfields = this.fetchFields(result);
540
  if (ref.isNull(pfields) == false) {
S
StoneT2000 已提交
541
    pfields = ref.reinterpret(pfields, this.numFields(result) * 68, 0);
542 543
    for (let i = 0; i < pfields.length; i += 68) {
      //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
544 545
      fields.push({
        name: ref.readCString(ref.reinterpret(pfields, 64, i)),
546 547 548 549 550
        bytes: pfields[i + 64],
        type: pfields[i + 66]
      })
    }
  }
S
StoneT2000 已提交
551 552

  let data = [];
553
  while (true) {
S
StoneT2000 已提交
554 555 556 557 558 559 560 561 562
    let { blocks, num_of_rows } = this.fetchBlock(result, fields);
    if (num_of_rows == 0) {
      break;
    }
    for (let i = 0; i < num_of_rows; i++) {
      data.push([]);
      let rowBlock = new Array(fields.length);
      for (let j = 0; j < fields.length; j++) {
        rowBlock[j] = blocks[j][i];
563
      }
564
      data[data.length - 1] = (rowBlock);
565 566
    }
  }
S
StoneT2000 已提交
567
  return { data: data, fields: fields, result: result };
568 569 570 571 572 573 574
}
CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) {
  //void taos_unsubscribe(TAOS_SUB *tsub);
  this.libtaos.taos_unsubscribe(subscription);
}

// Continuous Query
575
CTaosInterface.prototype.openStream = function openStream(connection, sql, callback, stime, stoppingCallback, param = ref.ref(ref.NULL)) {
576 577 578
  try {
    sql = ref.allocCString(sql);
  }
579
  catch (err) {
580 581 582 583 584 585 586 587 588 589 590 591
    throw "Attribute Error: sql string is expected as a str";
  }
  var cti = this;
  let asyncCallbackWrapper = function (param2, result2, row) {
    let fields = cti.fetchFields_a(result2);
    let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
    let blocks = new Array(fields.length);
    blocks.fill(null);
    let numOfRows2 = 1;
    let offset = 0;
    if (numOfRows2 > 0) {
      for (let i = 0; i < fields.length; i++) {
592
        if (!convertFunctions[fields[i]['type']]) {
593 594 595 596 597 598 599 600
          throw new errors.DatabaseError("Invalid data type returned from database");
        }
        blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro);
        offset += fields[i]['bytes'] * numOfRows2;
      }
    }
    callback(param2, result2, blocks, fields);
  }
601 602
  asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.refType(ref.types.void_ptr2)], asyncCallbackWrapper);
  asyncStoppingCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr], stoppingCallback);
603 604 605 606 607 608 609 610 611 612 613 614 615 616
  let streamHandle = this.libtaos.taos_open_stream(connection, sql, asyncCallbackWrapper, stime, param, asyncStoppingCallbackWrapper);
  if (ref.isNull(streamHandle)) {
    throw new errors.TDError('Failed to open a stream with TDengine');
    return false;
  }
  else {
    console.log("Succesfully opened stream");
    return streamHandle;
  }
}
CTaosInterface.prototype.closeStream = function closeStream(stream) {
  this.libtaos.taos_close_stream(stream);
  console.log("Closed stream");
}