cinterface.js 22.4 KB
Newer Older
1 2 3 4 5
/**
 * C Interface with TDengine Module
 * @module CTaosInterface
 */

sangshuduo's avatar
sangshuduo 已提交
6
const ref = require('ref-napi');
7
const os = require('os');
sangshuduo's avatar
sangshuduo 已提交
8 9 10
const ffi = require('ffi-napi');
const ArrayType = require('ref-array-napi');
const Struct = require('ref-struct-napi');
11
const FieldTypes = require('./constants');
12
const errors = require('./error');
S
StoneT2000 已提交
13
const TaosObjects = require('./taosobjects');
sangshuduo's avatar
sangshuduo 已提交
14
const { NULL_POINTER } = require('ref-napi');
15 16 17 18

module.exports = CTaosInterface;

function convertMillisecondsToDatetime(time) {
S
StoneT2000 已提交
19
  return new TaosObjects.TaosTimestamp(time);
20 21
}
function convertMicrosecondsToDatetime(time) {
22
  return new TaosObjects.TaosTimestamp(time * 0.001, true);
23 24
}

25
function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
26 27 28 29
  timestampConverter = convertMillisecondsToDatetime;
  if (micro == true) {
    timestampConverter = convertMicrosecondsToDatetime;
  }
L
liu0x54 已提交
30
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
    let queue = [];
    let time = 0;
    for (let i = currOffset; i < currOffset + nbytes; i++) {
      queue.push(data[i]);
    }
    for (let i = queue.length - 1; i >= 0; i--) {
      time += queue[i] * Math.pow(16, i * 2);
    }
    currOffset += nbytes;
    res.push(timestampConverter(time));
  }
  return res;
}
47
function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
48
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
49 50 51 52 53
  let res = new Array(data.length);
  for (let i = 0; i < data.length; i++) {
    if (data[i] == 0) {
      res[i] = false;
    }
54
    else if (data[i] == 1) {
55 56
      res[i] = true;
    }
57 58 59
    else if (data[i] == FieldTypes.C_BOOL_NULL) {
      res[i] = null;
    }
60 61 62
  }
  return res;
}
63
function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
64
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
65 66 67
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
68
    let d = data.readIntLE(currOffset, 1);
69
    res.push(d == FieldTypes.C_TINYINT_NULL ? null : d);
70 71 72 73
    currOffset += nbytes;
  }
  return res;
}
74
function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
75
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
76 77 78
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
79
    let d = data.readIntLE(currOffset, 2);
80
    res.push(d == FieldTypes.C_SMALLINT_NULL ? null : d);
81 82 83 84
    currOffset += nbytes;
  }
  return res;
}
85
function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
86
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
87 88 89
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
90 91
    let d = data.readInt32LE(currOffset);
    res.push(d == FieldTypes.C_INT_NULL ? null : d);
92 93 94 95
    currOffset += nbytes;
  }
  return res;
}
96
function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
97
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
98 99 100
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
101
    let d = data.readInt64LE(currOffset);
L
liu0x54 已提交
102
    res.push(d == FieldTypes.C_BIGINT_NULL ? null : BigInt(d));
103 104 105 106
    currOffset += nbytes;
  }
  return res;
}
107
function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
108
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
109 110 111
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
112 113
    let d = parseFloat(data.readFloatLE(currOffset).toFixed(5));
    res.push(isNaN(d) ? null : d);
114 115 116 117
    currOffset += nbytes;
  }
  return res;
}
118
function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
119
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
120 121 122
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
123 124
    let d = parseFloat(data.readDoubleLE(currOffset).toFixed(16));
    res.push(isNaN(d) ? null : d);
125 126 127 128
    currOffset += nbytes;
  }
  return res;
}
129 130

function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
L
liu0x54 已提交
131
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
132
  let res = [];
133

134 135
  let currOffset = 0;
  while (currOffset < data.length) {
136 137 138
    let len = data.readIntLE(currOffset, 2);
    let dataEntry = data.slice(currOffset + 2, currOffset + len + 2); //one entry in a row under a column;
    res.push(dataEntry.toString("utf-8"));
139 140 141 142 143
    currOffset += nbytes;
  }
  return res;
}

144
// Object with all the relevant converters from pblock data to javascript readable data
145
let convertFunctions = {
146 147 148 149 150 151 152
  [FieldTypes.C_BOOL]: convertBool,
  [FieldTypes.C_TINYINT]: convertTinyint,
  [FieldTypes.C_SMALLINT]: convertSmallint,
  [FieldTypes.C_INT]: convertInt,
  [FieldTypes.C_BIGINT]: convertBigint,
  [FieldTypes.C_FLOAT]: convertFloat,
  [FieldTypes.C_DOUBLE]: convertDouble,
153
  [FieldTypes.C_BINARY]: convertNchar,
154 155
  [FieldTypes.C_TIMESTAMP]: convertTimestamp,
  [FieldTypes.C_NCHAR]: convertNchar
156 157 158 159 160
}

// Define TaosField structure
var char_arr = ArrayType(ref.types.char);
var TaosField = Struct({
161 162
  'name': char_arr,
});
163
TaosField.fields.name.type.size = 65;
164
TaosField.defineProperty('type', ref.types.char);
165 166
TaosField.defineProperty('bytes', ref.types.short);

167

168
/**
S
StoneT2000 已提交
169
 *
170 171
 * @param {Object} config - Configuration options for the interface
 * @return {CTaosInterface}
S
StoneT2000 已提交
172 173 174
 * @class CTaosInterface
 * @classdesc The CTaosInterface is the interface through which Node.JS communicates data back and forth with TDengine. It is not advised to
 * access this class directly and use it unless you understand what these functions do.
175
 */
176
function CTaosInterface(config = null, pass = false) {
177 178
  ref.types.char_ptr = ref.refType(ref.types.char);
  ref.types.void_ptr = ref.refType(ref.types.void);
179
  ref.types.void_ptr2 = ref.refType(ref.types.void_ptr);
180
  /*Declare a bunch of functions first*/
181
  /* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS *  */
182 183 184 185 186 187 188

  if ('win32' == os.platform()) {
    taoslibname = 'taos';
  } else {
    taoslibname = 'libtaos';
  }
  this.libtaos = ffi.Library(taoslibname, {
189 190
    'taos_options': [ref.types.int, [ref.types.int, ref.types.void_ptr]],
    'taos_init': [ref.types.void, []],
191
    //TAOS *taos_connect(char *ip, char *user, char *pass, char *db, int port)
192
    'taos_connect': [ref.types.void_ptr, [ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int]],
193
    //void taos_close(TAOS *taos)
194 195 196
    'taos_close': [ref.types.void, [ref.types.void_ptr]],
    //int *taos_fetch_lengths(TAOS_RES *res);
    'taos_fetch_lengths': [ref.types.void_ptr, [ref.types.void_ptr]],
197
    //int taos_query(TAOS *taos, char *sqlstr)
198 199 200
    'taos_query': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr]],
    //int taos_affected_rows(TAOS_RES *res)
    'taos_affected_rows': [ref.types.int, [ref.types.void_ptr]],
201
    //int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)
202
    'taos_fetch_block': [ref.types.int, [ref.types.void_ptr, ref.types.void_ptr]],
203
    //int taos_num_fields(TAOS_RES *res);
204
    'taos_num_fields': [ref.types.int, [ref.types.void_ptr]],
205 206
    //TAOS_ROW taos_fetch_row(TAOS_RES *res)
    //TAOS_ROW is void **, but we set the return type as a reference instead to get the row
207 208
    'taos_fetch_row': [ref.refType(ref.types.void_ptr2), [ref.types.void_ptr]],
    'taos_print_row': [ref.types.int, [ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
209
    //int taos_result_precision(TAOS_RES *res)
210
    'taos_result_precision': [ref.types.int, [ref.types.void_ptr]],
211
    //void taos_free_result(TAOS_RES *res)
212
    'taos_free_result': [ref.types.void, [ref.types.void_ptr]],
213
    //int taos_field_count(TAOS *taos)
214
    'taos_field_count': [ref.types.int, [ref.types.void_ptr]],
215
    //TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)
216
    'taos_fetch_fields': [ref.refType(TaosField), [ref.types.void_ptr]],
217
    //int taos_errno(TAOS *taos)
218
    'taos_errno': [ref.types.int, [ref.types.void_ptr]],
219
    //char *taos_errstr(TAOS *taos)
220
    'taos_errstr': [ref.types.char_ptr, [ref.types.void_ptr]],
221
    //void taos_stop_query(TAOS_RES *res);
222
    'taos_stop_query': [ref.types.void, [ref.types.void_ptr]],
223
    //char *taos_get_server_info(TAOS *taos);
224
    'taos_get_server_info': [ref.types.char_ptr, [ref.types.void_ptr]],
225
    //char *taos_get_client_info();
226
    'taos_get_client_info': [ref.types.char_ptr, []],
227 228 229

    // ASYNC
    // void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param)
230
    'taos_query_a': [ref.types.void, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr]],
231
    // void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
232
    'taos_fetch_rows_a': [ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr]],
233 234

    // Subscription
S
StoneT2000 已提交
235
    //TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)
236
    'taos_subscribe': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.int, ref.types.char_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
S
StoneT2000 已提交
237
    // TAOS_RES *taos_consume(TAOS_SUB *tsub)
238
    'taos_consume': [ref.types.void_ptr, [ref.types.void_ptr]],
239
    //void taos_unsubscribe(TAOS_SUB *tsub);
240
    'taos_unsubscribe': [ref.types.void, [ref.types.void_ptr]],
241 242 243 244

    // Continuous Query
    //TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
    //                              int64_t stime, void *param, void (*callback)(void *));
245
    'taos_open_stream': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr]],
246
    //void taos_close_stream(TAOS_STREAM *tstr);
247
    'taos_close_stream': [ref.types.void, [ref.types.void_ptr]]
248

249 250 251 252 253 254 255
  });
  if (pass == false) {
    if (config == null) {
      this._config = ref.alloc(ref.types.char_ptr, ref.NULL);
    }
    else {
      try {
256
        this._config = ref.allocCString(config);
257
      }
258
      catch (err) {
259 260 261 262 263 264 265 266
        throw "Attribute Error: config is expected as a str";
      }
    }
    if (config != null) {
      this.libtaos.taos_options(3, this._config);
    }
    this.libtaos.taos_init();
  }
267
  return this;
268 269
}
CTaosInterface.prototype.config = function config() {
270 271 272 273 274
  return this._config;
}
CTaosInterface.prototype.connect = function connect(host = null, user = "root", password = "taosdata", db = null, port = 0) {
  let _host, _user, _password, _db, _port;
  try {
275 276
    _host = host != null ? ref.allocCString(host) : ref.alloc(ref.types.char_ptr, ref.NULL);
  }
277
  catch (err) {
278 279 280 281 282
    throw "Attribute Error: host is expected as a str";
  }
  try {
    _user = ref.allocCString(user)
  }
283
  catch (err) {
284 285 286 287 288
    throw "Attribute Error: user is expected as a str";
  }
  try {
    _password = ref.allocCString(password);
  }
289
  catch (err) {
290 291 292 293 294
    throw "Attribute Error: password is expected as a str";
  }
  try {
    _db = db != null ? ref.allocCString(db) : ref.alloc(ref.types.char_ptr, ref.NULL);
  }
295
  catch (err) {
296 297 298 299 300
    throw "Attribute Error: db is expected as a str";
  }
  try {
    _port = ref.alloc(ref.types.int, port);
  }
301
  catch (err) {
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
    throw TypeError("port is expected as an int")
  }
  let connection = this.libtaos.taos_connect(_host, _user, _password, _db, _port);
  if (ref.isNull(connection)) {
    throw new errors.TDError('Failed to connect to TDengine');
  }
  else {
    console.log('Successfully connected to TDengine');
  }
  return connection;
}
CTaosInterface.prototype.close = function close(connection) {
  this.libtaos.taos_close(connection);
  console.log("Connection is closed");
}
CTaosInterface.prototype.query = function query(connection, sql) {
318
  return this.libtaos.taos_query(connection, ref.allocCString(sql));
319
}
320 321
CTaosInterface.prototype.affectedRows = function affectedRows(result) {
  return this.libtaos.taos_affected_rows(result);
322
}
323 324
CTaosInterface.prototype.useResult = function useResult(result) {

325 326 327
  let fields = [];
  let pfields = this.fetchFields(result);
  if (ref.isNull(pfields) == false) {
328
    pfields = ref.reinterpret(pfields, this.fieldsCount(result) * 68, 0);
329
    for (let i = 0; i < pfields.length; i += 68) {
330
      //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
331 332
      fields.push({
        name: ref.readCString(ref.reinterpret(pfields, 65, i)),
333 334
        type: pfields[i + 65],
        bytes: pfields[i + 66]
335 336 337
      })
    }
  }
L
liu0x54 已提交
338
  return fields;
339 340
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
341 342 343 344
  let pblock = ref.NULL_POINTER;
  let num_of_rows = this.libtaos.taos_fetch_block(result, pblock);
  if (ref.isNull(pblock.deref()) == true) {
    return { block: null, num_of_rows: 0 };
345
  }
346

347
  var fieldL = this.libtaos.taos_fetch_lengths(result);
348

L
liu0x54 已提交
349 350
  let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);

351
  var fieldlens = [];
352

353
  if (ref.isNull(fieldL) == false) {
354 355
    for (let i = 0; i < fields.length; i++) {
      let plen = ref.reinterpret(fieldL, 4, i * 4);
356
      let len = plen.readInt32LE(0);
357
      fieldlens.push(len);
358 359
    }
  }
L
liu0x54 已提交
360

L
liu0x54 已提交
361
  let blocks = new Array(fields.length);
L
liu0x54 已提交
362
  blocks.fill(null);
363
  num_of_rows = Math.abs(num_of_rows);
L
liu0x54 已提交
364
  let offset = 0;
365 366
  let ptr = pblock.deref();

L
liu0x54 已提交
367
  for (let i = 0; i < fields.length; i++) {
368 369 370 371 372 373 374 375 376 377 378 379
    pdata = ref.reinterpret(ptr, 8, i * 8);
    if (ref.isNull(pdata.readPointer())) {
      blocks[i] = new Array();
    } else {
      pdata = ref.ref(pdata.readPointer());
      if (!convertFunctions[fields[i]['type']]) {
        throw new errors.DatabaseError("Invalid data type returned from database");
      }
      blocks[i] = convertFunctions[fields[i]['type']](pdata, num_of_rows, fieldlens[i], offset, isMicro);
    }
  }
  return { blocks: blocks, num_of_rows }
380
}
381 382 383 384
CTaosInterface.prototype.fetchRow = function fetchRow(result, fields) {
  let row = this.libtaos.taos_fetch_row(result);
  return row;
}
385 386 387 388
CTaosInterface.prototype.freeResult = function freeResult(result) {
  this.libtaos.taos_free_result(result);
  result = null;
}
389 390 391 392
/** Number of fields returned in this result handle, must use with async */
CTaosInterface.prototype.numFields = function numFields(result) {
  return this.libtaos.taos_num_fields(result);
}
393
// Fetch fields count by connection, the latest query
394 395
CTaosInterface.prototype.fieldsCount = function fieldsCount(result) {
  return this.libtaos.taos_field_count(result);
396 397 398 399
}
CTaosInterface.prototype.fetchFields = function fetchFields(result) {
  return this.libtaos.taos_fetch_fields(result);
}
400 401
CTaosInterface.prototype.errno = function errno(result) {
  return this.libtaos.taos_errno(result);
402
}
403 404
CTaosInterface.prototype.errStr = function errStr(result) {
  return ref.readCString(this.libtaos.taos_errstr(result));
405 406 407 408
}
// Async
CTaosInterface.prototype.query_a = function query_a(connection, sql, callback, param = ref.ref(ref.NULL)) {
  // void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, int), void *param)
409
  callback = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], callback);
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
  this.libtaos.taos_query_a(connection, ref.allocCString(sql), callback, param);
  return param;
}
/** Asynchrnously fetches the next block of rows. Wraps callback and transfers a 4th argument to the cursor, the row data as blocks in javascript form
 * Note: This isn't a recursive function, in order to fetch all data either use the TDengine cursor object, TaosQuery object, or implement a recrusive
 * function yourself using the libtaos.taos_fetch_rows_a function
 */
CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback, param = ref.ref(ref.NULL)) {
  // void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
  var cti = this;
  // wrap callback with a function so interface can access the numOfRows value, needed in order to properly process the binary data
  let asyncCallbackWrapper = function (param2, result2, numOfRows2) {
    // Data preparation to pass to cursor. Could be bottleneck in query execution callback times.
    let row = cti.libtaos.taos_fetch_row(result2);
    let fields = cti.fetchFields_a(result2);
425

426
    let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
427 428 429 430
    let blocks = new Array(fields.length);
    blocks.fill(null);
    numOfRows2 = Math.abs(numOfRows2);
    let offset = 0;
431 432 433
    var fieldL = cti.libtaos.taos_fetch_lengths(result);
    var fieldlens = [];
    if (ref.isNull(fieldL) == false) {
434 435 436 437

      for (let i = 0; i < fields.length; i++) {
        let plen = ref.reinterpret(fieldL, 8, i * 8);
        let len = ref.get(plen, 0, ref.types.int32);
438 439 440
        fieldlens.push(len);
      }
    }
441
    if (numOfRows2 > 0) {
442
      for (let i = 0; i < fields.length; i++) {
443 444 445 446 447 448 449 450 451 452 453 454
        if (ref.isNull(pdata.readPointer())) {
          blocks[i] = new Array();
        } else {
          if (!convertFunctions[fields[i]['type']]) {
            throw new errors.DatabaseError("Invalid data type returned from database");
          }
          let prow = ref.reinterpret(row, 8, i * 8);
          prow = prow.readPointer();
          prow = ref.ref(prow);
          blocks[i] = convertFunctions[fields[i]['type']](prow, 1, fieldlens[i], offset, isMicro);
          //offset += fields[i]['bytes'] * numOfRows2;
        }
455 456 457 458
      }
    }
    callback(param2, result2, numOfRows2, blocks);
  }
459
  asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], asyncCallbackWrapper);
460 461 462 463
  this.libtaos.taos_fetch_rows_a(result, asyncCallbackWrapper, param);
  return param;
}
// Fetch field meta data by result handle
464
CTaosInterface.prototype.fetchFields_a = function fetchFields_a(result) {
465 466 467 468
  let pfields = this.fetchFields(result);
  let pfieldscount = this.numFields(result);
  let fields = [];
  if (ref.isNull(pfields) == false) {
469
    pfields = ref.reinterpret(pfields, 68 * pfieldscount, 0);
470
    for (let i = 0; i < pfields.length; i += 68) {
471
      //0 - 64 = name //65 = type, 66 - 67 = bytes
472 473
      fields.push({
        name: ref.readCString(ref.reinterpret(pfields, 65, i)),
474 475
        type: pfields[i + 65],
        bytes: pfields[i + 66]
476 477 478 479
      })
    }
  }
  return fields;
480
}
481 482
// Stop a query by result handle
CTaosInterface.prototype.stopQuery = function stopQuery(result) {
483
  if (result != null) {
484 485 486 487 488 489 490 491 492 493 494 495 496 497
    this.libtaos.taos_stop_query(result);
  }
  else {
    throw new errors.ProgrammingError("No result handle passed to stop query");
  }
}
CTaosInterface.prototype.getServerInfo = function getServerInfo(connection) {
  return ref.readCString(this.libtaos.taos_get_server_info(connection));
}
CTaosInterface.prototype.getClientInfo = function getClientInfo() {
  return ref.readCString(this.libtaos.taos_get_client_info());
}

// Subscription
S
StoneT2000 已提交
498 499 500
CTaosInterface.prototype.subscribe = function subscribe(connection, restart, topic, sql, interval) {
  let topicOrig = topic;
  let sqlOrig = sql;
501
  try {
S
StoneT2000 已提交
502
    sql = sql != null ? ref.allocCString(sql) : ref.alloc(ref.types.char_ptr, ref.NULL);
503
  }
504
  catch (err) {
S
StoneT2000 已提交
505
    throw "Attribute Error: sql is expected as a str";
506 507
  }
  try {
S
StoneT2000 已提交
508
    topic = topic != null ? ref.allocCString(topic) : ref.alloc(ref.types.char_ptr, ref.NULL);
509
  }
510
  catch (err) {
S
StoneT2000 已提交
511 512
    throw TypeError("topic is expected as a str");
  }
S
StoneT2000 已提交
513

S
StoneT2000 已提交
514
  restart = ref.alloc(ref.types.int, restart);
S
StoneT2000 已提交
515

S
StoneT2000 已提交
516
  let subscription = this.libtaos.taos_subscribe(connection, restart, topic, sql, null, null, interval);
517 518 519 520
  if (ref.isNull(subscription)) {
    throw new errors.TDError('Failed to subscribe to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig);
  }
  else {
S
StoneT2000 已提交
521
    console.log('Successfully subscribed to TDengine - Topic: ' + topicOrig);
522 523 524
  }
  return subscription;
}
S
StoneT2000 已提交
525 526 527

CTaosInterface.prototype.consume = function consume(subscription) {
  let result = this.libtaos.taos_consume(subscription);
528
  let fields = [];
S
StoneT2000 已提交
529
  let pfields = this.fetchFields(result);
530
  if (ref.isNull(pfields) == false) {
S
StoneT2000 已提交
531
    pfields = ref.reinterpret(pfields, this.numFields(result) * 68, 0);
532 533
    for (let i = 0; i < pfields.length; i += 68) {
      //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
534 535
      fields.push({
        name: ref.readCString(ref.reinterpret(pfields, 64, i)),
536 537 538 539 540
        bytes: pfields[i + 64],
        type: pfields[i + 66]
      })
    }
  }
S
StoneT2000 已提交
541 542

  let data = [];
543
  while (true) {
S
StoneT2000 已提交
544 545 546 547 548 549 550 551 552
    let { blocks, num_of_rows } = this.fetchBlock(result, fields);
    if (num_of_rows == 0) {
      break;
    }
    for (let i = 0; i < num_of_rows; i++) {
      data.push([]);
      let rowBlock = new Array(fields.length);
      for (let j = 0; j < fields.length; j++) {
        rowBlock[j] = blocks[j][i];
553
      }
554
      data[data.length - 1] = (rowBlock);
555 556
    }
  }
S
StoneT2000 已提交
557
  return { data: data, fields: fields, result: result };
558 559 560 561 562 563 564
}
CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) {
  //void taos_unsubscribe(TAOS_SUB *tsub);
  this.libtaos.taos_unsubscribe(subscription);
}

// Continuous Query
565
CTaosInterface.prototype.openStream = function openStream(connection, sql, callback, stime, stoppingCallback, param = ref.ref(ref.NULL)) {
566 567 568
  try {
    sql = ref.allocCString(sql);
  }
569
  catch (err) {
570 571 572 573 574 575 576 577 578 579 580 581
    throw "Attribute Error: sql string is expected as a str";
  }
  var cti = this;
  let asyncCallbackWrapper = function (param2, result2, row) {
    let fields = cti.fetchFields_a(result2);
    let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
    let blocks = new Array(fields.length);
    blocks.fill(null);
    let numOfRows2 = 1;
    let offset = 0;
    if (numOfRows2 > 0) {
      for (let i = 0; i < fields.length; i++) {
582
        if (!convertFunctions[fields[i]['type']]) {
583 584 585 586 587 588 589 590
          throw new errors.DatabaseError("Invalid data type returned from database");
        }
        blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro);
        offset += fields[i]['bytes'] * numOfRows2;
      }
    }
    callback(param2, result2, blocks, fields);
  }
591 592
  asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.refType(ref.types.void_ptr2)], asyncCallbackWrapper);
  asyncStoppingCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr], stoppingCallback);
593 594 595 596 597 598 599 600 601 602 603 604 605 606
  let streamHandle = this.libtaos.taos_open_stream(connection, sql, asyncCallbackWrapper, stime, param, asyncStoppingCallbackWrapper);
  if (ref.isNull(streamHandle)) {
    throw new errors.TDError('Failed to open a stream with TDengine');
    return false;
  }
  else {
    console.log("Succesfully opened stream");
    return streamHandle;
  }
}
CTaosInterface.prototype.closeStream = function closeStream(stream) {
  this.libtaos.taos_close_stream(stream);
  console.log("Closed stream");
}