cinterface.js 21.7 KB
Newer Older
1 2 3 4 5
/**
 * C Interface with TDengine Module
 * @module CTaosInterface
 */

sangshuduo's avatar
sangshuduo 已提交
6
const ref = require('ref-napi');
7
const os = require('os');
sangshuduo's avatar
sangshuduo 已提交
8 9 10
const ffi = require('ffi-napi');
const ArrayType = require('ref-array-napi');
const Struct = require('ref-struct-napi');
11
const FieldTypes = require('./constants');
12
const errors = require('./error');
S
StoneT2000 已提交
13
const TaosObjects = require('./taosobjects');
sangshuduo's avatar
sangshuduo 已提交
14
const { NULL_POINTER } = require('ref-napi');
15 16 17

module.exports = CTaosInterface;

18
function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
19
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
20 21 22
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
23
    let time = data.readInt64LE(currOffset);
24
    currOffset += nbytes;
25
    res.push(new TaosObjects.TaosTimestamp(time, precision));
26 27 28
  }
  return res;
}
29
function convertBool(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
30
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
31 32 33 34 35
  let res = new Array(data.length);
  for (let i = 0; i < data.length; i++) {
    if (data[i] == 0) {
      res[i] = false;
    }
36
    else if (data[i] == 1) {
37 38
      res[i] = true;
    }
39 40 41
    else if (data[i] == FieldTypes.C_BOOL_NULL) {
      res[i] = null;
    }
42 43 44
  }
  return res;
}
45
function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
46
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
47 48 49
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
50
    let d = data.readIntLE(currOffset, 1);
51
    res.push(d == FieldTypes.C_TINYINT_NULL ? null : d);
52 53 54 55
    currOffset += nbytes;
  }
  return res;
}
56
function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
57
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
58 59 60
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
61
    let d = data.readIntLE(currOffset, 2);
62
    res.push(d == FieldTypes.C_SMALLINT_NULL ? null : d);
63 64 65 66
    currOffset += nbytes;
  }
  return res;
}
67
function convertInt(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
68
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
69 70 71
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
72 73
    let d = data.readInt32LE(currOffset);
    res.push(d == FieldTypes.C_INT_NULL ? null : d);
74 75 76 77
    currOffset += nbytes;
  }
  return res;
}
78
function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
79
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
80 81 82
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
83
    let d = data.readInt64LE(currOffset);
L
liu0x54 已提交
84
    res.push(d == FieldTypes.C_BIGINT_NULL ? null : BigInt(d));
85 86 87 88
    currOffset += nbytes;
  }
  return res;
}
89
function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
90
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
91 92 93
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
94 95
    let d = parseFloat(data.readFloatLE(currOffset).toFixed(5));
    res.push(isNaN(d) ? null : d);
96 97 98 99
    currOffset += nbytes;
  }
  return res;
}
100
function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
101
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
102 103 104
  let res = [];
  let currOffset = 0;
  while (currOffset < data.length) {
105 106
    let d = parseFloat(data.readDoubleLE(currOffset).toFixed(16));
    res.push(isNaN(d) ? null : d);
107 108 109 110
    currOffset += nbytes;
  }
  return res;
}
111

112
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
L
liu0x54 已提交
113
  data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
114
  let res = [];
115

116 117
  let currOffset = 0;
  while (currOffset < data.length) {
118 119 120
    let len = data.readIntLE(currOffset, 2);
    let dataEntry = data.slice(currOffset + 2, currOffset + len + 2); //one entry in a row under a column;
    res.push(dataEntry.toString("utf-8"));
121 122 123 124 125
    currOffset += nbytes;
  }
  return res;
}

126
// Object with all the relevant converters from pblock data to javascript readable data
127
let convertFunctions = {
128 129 130 131 132 133 134
  [FieldTypes.C_BOOL]: convertBool,
  [FieldTypes.C_TINYINT]: convertTinyint,
  [FieldTypes.C_SMALLINT]: convertSmallint,
  [FieldTypes.C_INT]: convertInt,
  [FieldTypes.C_BIGINT]: convertBigint,
  [FieldTypes.C_FLOAT]: convertFloat,
  [FieldTypes.C_DOUBLE]: convertDouble,
135
  [FieldTypes.C_BINARY]: convertNchar,
136 137
  [FieldTypes.C_TIMESTAMP]: convertTimestamp,
  [FieldTypes.C_NCHAR]: convertNchar
138 139 140 141 142
}

// Define TaosField structure
var char_arr = ArrayType(ref.types.char);
var TaosField = Struct({
143 144
  'name': char_arr,
});
145
TaosField.fields.name.type.size = 65;
146
TaosField.defineProperty('type', ref.types.char);
147 148
TaosField.defineProperty('bytes', ref.types.short);

149

150
/**
S
StoneT2000 已提交
151
 *
152 153
 * @param {Object} config - Configuration options for the interface
 * @return {CTaosInterface}
S
StoneT2000 已提交
154 155 156
 * @class CTaosInterface
 * @classdesc The CTaosInterface is the interface through which Node.JS communicates data back and forth with TDengine. It is not advised to
 * access this class directly and use it unless you understand what these functions do.
157
 */
158
function CTaosInterface(config = null, pass = false) {
159 160
  ref.types.char_ptr = ref.refType(ref.types.char);
  ref.types.void_ptr = ref.refType(ref.types.void);
161
  ref.types.void_ptr2 = ref.refType(ref.types.void_ptr);
162
  /*Declare a bunch of functions first*/
163
  /* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS *  */
164 165 166 167 168 169 170

  if ('win32' == os.platform()) {
    taoslibname = 'taos';
  } else {
    taoslibname = 'libtaos';
  }
  this.libtaos = ffi.Library(taoslibname, {
171 172
    'taos_options': [ref.types.int, [ref.types.int, ref.types.void_ptr]],
    'taos_init': [ref.types.void, []],
173
    //TAOS *taos_connect(char *ip, char *user, char *pass, char *db, int port)
174
    'taos_connect': [ref.types.void_ptr, [ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int]],
175
    //void taos_close(TAOS *taos)
176 177 178
    'taos_close': [ref.types.void, [ref.types.void_ptr]],
    //int *taos_fetch_lengths(TAOS_RES *res);
    'taos_fetch_lengths': [ref.types.void_ptr, [ref.types.void_ptr]],
179
    //int taos_query(TAOS *taos, char *sqlstr)
180 181 182
    'taos_query': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr]],
    //int taos_affected_rows(TAOS_RES *res)
    'taos_affected_rows': [ref.types.int, [ref.types.void_ptr]],
183
    //int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)
184
    'taos_fetch_block': [ref.types.int, [ref.types.void_ptr, ref.types.void_ptr]],
185
    //int taos_num_fields(TAOS_RES *res);
186
    'taos_num_fields': [ref.types.int, [ref.types.void_ptr]],
187 188
    //TAOS_ROW taos_fetch_row(TAOS_RES *res)
    //TAOS_ROW is void **, but we set the return type as a reference instead to get the row
189 190
    'taos_fetch_row': [ref.refType(ref.types.void_ptr2), [ref.types.void_ptr]],
    'taos_print_row': [ref.types.int, [ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
191
    //int taos_result_precision(TAOS_RES *res)
192
    'taos_result_precision': [ref.types.int, [ref.types.void_ptr]],
193
    //void taos_free_result(TAOS_RES *res)
194
    'taos_free_result': [ref.types.void, [ref.types.void_ptr]],
195
    //int taos_field_count(TAOS *taos)
196
    'taos_field_count': [ref.types.int, [ref.types.void_ptr]],
197
    //TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)
198
    'taos_fetch_fields': [ref.refType(TaosField), [ref.types.void_ptr]],
199
    //int taos_errno(TAOS *taos)
200
    'taos_errno': [ref.types.int, [ref.types.void_ptr]],
201
    //char *taos_errstr(TAOS *taos)
202
    'taos_errstr': [ref.types.char_ptr, [ref.types.void_ptr]],
203
    //void taos_stop_query(TAOS_RES *res);
204
    'taos_stop_query': [ref.types.void, [ref.types.void_ptr]],
205
    //char *taos_get_server_info(TAOS *taos);
206
    'taos_get_server_info': [ref.types.char_ptr, [ref.types.void_ptr]],
207
    //char *taos_get_client_info();
208
    'taos_get_client_info': [ref.types.char_ptr, []],
209 210 211

    // ASYNC
    // void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param)
212
    'taos_query_a': [ref.types.void, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr]],
213
    // void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
214
    'taos_fetch_rows_a': [ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr]],
215 216

    // Subscription
S
StoneT2000 已提交
217
    //TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)
218
    'taos_subscribe': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.int, ref.types.char_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr, ref.types.int]],
S
StoneT2000 已提交
219
    // TAOS_RES *taos_consume(TAOS_SUB *tsub)
220
    'taos_consume': [ref.types.void_ptr, [ref.types.void_ptr]],
221
    //void taos_unsubscribe(TAOS_SUB *tsub);
222
    'taos_unsubscribe': [ref.types.void, [ref.types.void_ptr]],
223 224 225 226

    // Continuous Query
    //TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
    //                              int64_t stime, void *param, void (*callback)(void *));
227
    'taos_open_stream': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr]],
228
    //void taos_close_stream(TAOS_STREAM *tstr);
229
    'taos_close_stream': [ref.types.void, [ref.types.void_ptr]]
230

231 232 233 234 235 236 237
  });
  if (pass == false) {
    if (config == null) {
      this._config = ref.alloc(ref.types.char_ptr, ref.NULL);
    }
    else {
      try {
238
        this._config = ref.allocCString(config);
239
      }
240
      catch (err) {
241 242 243 244 245 246 247 248
        throw "Attribute Error: config is expected as a str";
      }
    }
    if (config != null) {
      this.libtaos.taos_options(3, this._config);
    }
    this.libtaos.taos_init();
  }
249
  return this;
250 251
}
CTaosInterface.prototype.config = function config() {
252 253 254 255 256
  return this._config;
}
CTaosInterface.prototype.connect = function connect(host = null, user = "root", password = "taosdata", db = null, port = 0) {
  let _host, _user, _password, _db, _port;
  try {
257
    _host = host != null ? ref.allocCString(host) : ref.NULL;
258
  }
259
  catch (err) {
260 261 262 263 264
    throw "Attribute Error: host is expected as a str";
  }
  try {
    _user = ref.allocCString(user)
  }
265
  catch (err) {
266 267 268 269 270
    throw "Attribute Error: user is expected as a str";
  }
  try {
    _password = ref.allocCString(password);
  }
271
  catch (err) {
272 273 274
    throw "Attribute Error: password is expected as a str";
  }
  try {
275
    _db = db != null ? ref.allocCString(db) : ref.NULL;
276
  }
277
  catch (err) {
278 279 280 281 282
    throw "Attribute Error: db is expected as a str";
  }
  try {
    _port = ref.alloc(ref.types.int, port);
  }
283
  catch (err) {
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
    throw TypeError("port is expected as an int")
  }
  let connection = this.libtaos.taos_connect(_host, _user, _password, _db, _port);
  if (ref.isNull(connection)) {
    throw new errors.TDError('Failed to connect to TDengine');
  }
  else {
    console.log('Successfully connected to TDengine');
  }
  return connection;
}
CTaosInterface.prototype.close = function close(connection) {
  this.libtaos.taos_close(connection);
  console.log("Connection is closed");
}
CTaosInterface.prototype.query = function query(connection, sql) {
300
  return this.libtaos.taos_query(connection, ref.allocCString(sql));
301
}
302 303
CTaosInterface.prototype.affectedRows = function affectedRows(result) {
  return this.libtaos.taos_affected_rows(result);
304
}
305 306
CTaosInterface.prototype.useResult = function useResult(result) {

307 308 309
  let fields = [];
  let pfields = this.fetchFields(result);
  if (ref.isNull(pfields) == false) {
310
    pfields = ref.reinterpret(pfields, this.fieldsCount(result) * 68, 0);
311
    for (let i = 0; i < pfields.length; i += 68) {
312
      //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
313 314
      fields.push({
        name: ref.readCString(ref.reinterpret(pfields, 65, i)),
315 316
        type: pfields[i + 65],
        bytes: pfields[i + 66]
317 318 319
      })
    }
  }
L
liu0x54 已提交
320
  return fields;
321 322
}
CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
323 324 325 326
  let pblock = ref.NULL_POINTER;
  let num_of_rows = this.libtaos.taos_fetch_block(result, pblock);
  if (ref.isNull(pblock.deref()) == true) {
    return { block: null, num_of_rows: 0 };
327
  }
328

329
  var fieldL = this.libtaos.taos_fetch_lengths(result);
330
  let precision = this.libtaos.taos_result_precision(result);
L
liu0x54 已提交
331

332
  var fieldlens = [];
333

334
  if (ref.isNull(fieldL) == false) {
335 336
    for (let i = 0; i < fields.length; i++) {
      let plen = ref.reinterpret(fieldL, 4, i * 4);
337
      let len = plen.readInt32LE(0);
338
      fieldlens.push(len);
339 340
    }
  }
L
liu0x54 已提交
341

L
liu0x54 已提交
342
  let blocks = new Array(fields.length);
L
liu0x54 已提交
343
  blocks.fill(null);
344
  num_of_rows = Math.abs(num_of_rows);
L
liu0x54 已提交
345
  let offset = 0;
346 347
  let ptr = pblock.deref();

L
liu0x54 已提交
348
  for (let i = 0; i < fields.length; i++) {
349 350 351 352 353 354 355 356
    pdata = ref.reinterpret(ptr, 8, i * 8);
    if (ref.isNull(pdata.readPointer())) {
      blocks[i] = new Array();
    } else {
      pdata = ref.ref(pdata.readPointer());
      if (!convertFunctions[fields[i]['type']]) {
        throw new errors.DatabaseError("Invalid data type returned from database");
      }
357
      blocks[i] = convertFunctions[fields[i]['type']](pdata, num_of_rows, fieldlens[i], offset, precision);
358 359 360
    }
  }
  return { blocks: blocks, num_of_rows }
361
}
362 363 364 365
CTaosInterface.prototype.fetchRow = function fetchRow(result, fields) {
  let row = this.libtaos.taos_fetch_row(result);
  return row;
}
366 367 368 369
CTaosInterface.prototype.freeResult = function freeResult(result) {
  this.libtaos.taos_free_result(result);
  result = null;
}
370 371 372 373
/** Number of fields returned in this result handle, must use with async */
CTaosInterface.prototype.numFields = function numFields(result) {
  return this.libtaos.taos_num_fields(result);
}
374
// Fetch fields count by connection, the latest query
375 376
CTaosInterface.prototype.fieldsCount = function fieldsCount(result) {
  return this.libtaos.taos_field_count(result);
377 378 379 380
}
CTaosInterface.prototype.fetchFields = function fetchFields(result) {
  return this.libtaos.taos_fetch_fields(result);
}
381 382
CTaosInterface.prototype.errno = function errno(result) {
  return this.libtaos.taos_errno(result);
383
}
384 385
CTaosInterface.prototype.errStr = function errStr(result) {
  return ref.readCString(this.libtaos.taos_errstr(result));
386 387 388 389
}
// Async
CTaosInterface.prototype.query_a = function query_a(connection, sql, callback, param = ref.ref(ref.NULL)) {
  // void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, int), void *param)
390
  callback = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], callback);
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
  this.libtaos.taos_query_a(connection, ref.allocCString(sql), callback, param);
  return param;
}
/** Asynchrnously fetches the next block of rows. Wraps callback and transfers a 4th argument to the cursor, the row data as blocks in javascript form
 * Note: This isn't a recursive function, in order to fetch all data either use the TDengine cursor object, TaosQuery object, or implement a recrusive
 * function yourself using the libtaos.taos_fetch_rows_a function
 */
CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback, param = ref.ref(ref.NULL)) {
  // void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
  var cti = this;
  // wrap callback with a function so interface can access the numOfRows value, needed in order to properly process the binary data
  let asyncCallbackWrapper = function (param2, result2, numOfRows2) {
    // Data preparation to pass to cursor. Could be bottleneck in query execution callback times.
    let row = cti.libtaos.taos_fetch_row(result2);
    let fields = cti.fetchFields_a(result2);
406

407
    let precision = cti.libtaos.taos_result_precision(result2);
408 409 410 411
    let blocks = new Array(fields.length);
    blocks.fill(null);
    numOfRows2 = Math.abs(numOfRows2);
    let offset = 0;
412 413 414
    var fieldL = cti.libtaos.taos_fetch_lengths(result);
    var fieldlens = [];
    if (ref.isNull(fieldL) == false) {
415 416 417 418

      for (let i = 0; i < fields.length; i++) {
        let plen = ref.reinterpret(fieldL, 8, i * 8);
        let len = ref.get(plen, 0, ref.types.int32);
419 420 421
        fieldlens.push(len);
      }
    }
422
    if (numOfRows2 > 0) {
423
      for (let i = 0; i < fields.length; i++) {
424 425 426 427 428 429 430 431 432
        if (ref.isNull(pdata.readPointer())) {
          blocks[i] = new Array();
        } else {
          if (!convertFunctions[fields[i]['type']]) {
            throw new errors.DatabaseError("Invalid data type returned from database");
          }
          let prow = ref.reinterpret(row, 8, i * 8);
          prow = prow.readPointer();
          prow = ref.ref(prow);
433
          blocks[i] = convertFunctions[fields[i]['type']](prow, 1, fieldlens[i], offset, precision);
434 435
          //offset += fields[i]['bytes'] * numOfRows2;
        }
436 437 438 439
      }
    }
    callback(param2, result2, numOfRows2, blocks);
  }
440
  asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.types.int], asyncCallbackWrapper);
441 442 443 444
  this.libtaos.taos_fetch_rows_a(result, asyncCallbackWrapper, param);
  return param;
}
// Fetch field meta data by result handle
445
CTaosInterface.prototype.fetchFields_a = function fetchFields_a(result) {
446 447 448 449
  let pfields = this.fetchFields(result);
  let pfieldscount = this.numFields(result);
  let fields = [];
  if (ref.isNull(pfields) == false) {
450
    pfields = ref.reinterpret(pfields, 68 * pfieldscount, 0);
451
    for (let i = 0; i < pfields.length; i += 68) {
452
      //0 - 64 = name //65 = type, 66 - 67 = bytes
453 454
      fields.push({
        name: ref.readCString(ref.reinterpret(pfields, 65, i)),
455 456
        type: pfields[i + 65],
        bytes: pfields[i + 66]
457 458 459 460
      })
    }
  }
  return fields;
461
}
462 463
// Stop a query by result handle
CTaosInterface.prototype.stopQuery = function stopQuery(result) {
464
  if (result != null) {
465 466 467 468 469 470 471 472 473 474 475 476 477 478
    this.libtaos.taos_stop_query(result);
  }
  else {
    throw new errors.ProgrammingError("No result handle passed to stop query");
  }
}
CTaosInterface.prototype.getServerInfo = function getServerInfo(connection) {
  return ref.readCString(this.libtaos.taos_get_server_info(connection));
}
CTaosInterface.prototype.getClientInfo = function getClientInfo() {
  return ref.readCString(this.libtaos.taos_get_client_info());
}

// Subscription
S
StoneT2000 已提交
479 480 481
CTaosInterface.prototype.subscribe = function subscribe(connection, restart, topic, sql, interval) {
  let topicOrig = topic;
  let sqlOrig = sql;
482
  try {
S
StoneT2000 已提交
483
    sql = sql != null ? ref.allocCString(sql) : ref.alloc(ref.types.char_ptr, ref.NULL);
484
  }
485
  catch (err) {
S
StoneT2000 已提交
486
    throw "Attribute Error: sql is expected as a str";
487 488
  }
  try {
S
StoneT2000 已提交
489
    topic = topic != null ? ref.allocCString(topic) : ref.alloc(ref.types.char_ptr, ref.NULL);
490
  }
491
  catch (err) {
S
StoneT2000 已提交
492 493
    throw TypeError("topic is expected as a str");
  }
S
StoneT2000 已提交
494

S
StoneT2000 已提交
495
  restart = ref.alloc(ref.types.int, restart);
S
StoneT2000 已提交
496

S
StoneT2000 已提交
497
  let subscription = this.libtaos.taos_subscribe(connection, restart, topic, sql, null, null, interval);
498 499 500 501
  if (ref.isNull(subscription)) {
    throw new errors.TDError('Failed to subscribe to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig);
  }
  else {
S
StoneT2000 已提交
502
    console.log('Successfully subscribed to TDengine - Topic: ' + topicOrig);
503 504 505
  }
  return subscription;
}
S
StoneT2000 已提交
506 507 508

CTaosInterface.prototype.consume = function consume(subscription) {
  let result = this.libtaos.taos_consume(subscription);
509
  let fields = [];
S
StoneT2000 已提交
510
  let pfields = this.fetchFields(result);
511
  if (ref.isNull(pfields) == false) {
S
StoneT2000 已提交
512
    pfields = ref.reinterpret(pfields, this.numFields(result) * 68, 0);
513 514
    for (let i = 0; i < pfields.length; i += 68) {
      //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type
515 516
      fields.push({
        name: ref.readCString(ref.reinterpret(pfields, 64, i)),
517 518 519 520 521
        bytes: pfields[i + 64],
        type: pfields[i + 66]
      })
    }
  }
S
StoneT2000 已提交
522 523

  let data = [];
524
  while (true) {
S
StoneT2000 已提交
525 526 527 528 529 530 531 532 533
    let { blocks, num_of_rows } = this.fetchBlock(result, fields);
    if (num_of_rows == 0) {
      break;
    }
    for (let i = 0; i < num_of_rows; i++) {
      data.push([]);
      let rowBlock = new Array(fields.length);
      for (let j = 0; j < fields.length; j++) {
        rowBlock[j] = blocks[j][i];
534
      }
535
      data[data.length - 1] = (rowBlock);
536 537
    }
  }
S
StoneT2000 已提交
538
  return { data: data, fields: fields, result: result };
539 540 541 542 543 544 545
}
CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) {
  //void taos_unsubscribe(TAOS_SUB *tsub);
  this.libtaos.taos_unsubscribe(subscription);
}

// Continuous Query
546
CTaosInterface.prototype.openStream = function openStream(connection, sql, callback, stime, stoppingCallback, param = ref.ref(ref.NULL)) {
547 548 549
  try {
    sql = ref.allocCString(sql);
  }
550
  catch (err) {
551 552 553 554 555
    throw "Attribute Error: sql string is expected as a str";
  }
  var cti = this;
  let asyncCallbackWrapper = function (param2, result2, row) {
    let fields = cti.fetchFields_a(result2);
556
    let precision = cti.libtaos.taos_result_precision(result2);
557 558 559 560 561 562
    let blocks = new Array(fields.length);
    blocks.fill(null);
    let numOfRows2 = 1;
    let offset = 0;
    if (numOfRows2 > 0) {
      for (let i = 0; i < fields.length; i++) {
563
        if (!convertFunctions[fields[i]['type']]) {
564 565
          throw new errors.DatabaseError("Invalid data type returned from database");
        }
566
        blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, precision);
567 568 569 570 571
        offset += fields[i]['bytes'] * numOfRows2;
      }
    }
    callback(param2, result2, blocks, fields);
  }
572 573
  asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.refType(ref.types.void_ptr2)], asyncCallbackWrapper);
  asyncStoppingCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr], stoppingCallback);
574 575 576 577 578 579 580 581 582 583 584 585 586 587
  let streamHandle = this.libtaos.taos_open_stream(connection, sql, asyncCallbackWrapper, stime, param, asyncStoppingCallbackWrapper);
  if (ref.isNull(streamHandle)) {
    throw new errors.TDError('Failed to open a stream with TDengine');
    return false;
  }
  else {
    console.log("Succesfully opened stream");
    return streamHandle;
  }
}
CTaosInterface.prototype.closeStream = function closeStream(stream) {
  this.libtaos.taos_close_stream(stream);
  console.log("Closed stream");
}