cursor.js 18.0 KB
Newer Older
sangshuduo's avatar
sangshuduo 已提交
1
const ref = require('ref-napi');
S
StoneT2000 已提交
2
require('./globalfunc.js')
3
const CTaosInterface = require('./cinterface')
4
const errors = require('./error')
5
const TaosQuery = require('./taosquery')
S
StoneT2000 已提交
6
const { PerformanceObserver, performance } = require('perf_hooks');
7 8
module.exports = TDengineCursor;

9
/**
10
 * @typedef {Object} Buffer - A Node.js buffer. Please refer to {@link https://nodejs.org/api/buffer.html} for more details
S
StoneT2000 已提交
11 12 13 14
 * @global
 */

/**
15
 * @class TDengineCursor
S
StoneT2000 已提交
16 17 18 19
 * @classdesc  The TDengine Cursor works directly with the C Interface which works with TDengine. It refrains from
 * returning parsed data and majority of functions return the raw data such as cursor.fetchall() as compared to the TaosQuery class which
 * has functions that "prettify" the data and add more functionality and can be used through cursor.query("your query"). Instead of
 * promises, the class and its functions use callbacks.
20 21
 * @param {TDengineConnection} - The TDengine Connection this cursor uses to interact with TDengine
 * @property {data} - Latest retrieved data from query execution. It is an empty array by default
S
StoneT2000 已提交
22 23
 * @property {fields} - Array of the field objects in order from left to right of the latest data retrieved
 * @since 1.0.0
24
 */
25
function TDengineCursor(connection = null) {
26
  //All parameters are store for sync queries only.
27 28 29 30
  this._rowcount = -1;
  this._connection = null;
  this._result = null;
  this._fields = null;
31
  this.data = [];
S
StoneT2000 已提交
32
  this.fields = null;
33 34
  if (connection != null) {
    this._connection = connection
35 36 37 38
    this._chandle = connection._chandle //pass through, just need library loaded.
  }
  else {
    throw new errors.ProgrammingError("A TDengineConnection object is required to be passed to the TDengineCursor");
39 40 41
  }

}
42 43
/**
 * Get the row counts of the latest query
S
StoneT2000 已提交
44
 * @since 1.0.0
45 46
 * @return {number} Rowcount
 */
47 48 49
TDengineCursor.prototype.rowcount = function rowcount() {
  return this._rowcount;
}
50 51 52
/**
 * Close the cursor by setting its connection to null and freeing results from the connection and resetting the results it has stored
 * @return {boolean} Whether or not the cursor was succesfully closed
S
StoneT2000 已提交
53
 * @since 1.0.0
54
 */
55 56 57 58
TDengineCursor.prototype.close = function close() {
  if (this._connection == null) {
    return false;
  }
59
  this._connection._clearResultSet();
60 61 62 63
  this._reset_result();
  this._connection = null;
  return true;
}
64
/**
S
StoneT2000 已提交
65 66 67 68 69 70 71 72
 * Create a TaosQuery object to perform a query to TDengine and retrieve data.
 * @param {string} operation - The operation string to perform a query on
 * @param {boolean} execute - Whether or not to immedietely perform the query. Default is false.
 * @return {TaosQuery | Promise<TaosResult>} A TaosQuery object
 * @example
 * var query = cursor.query("select count(*) from meterinfo.meters");
 * query.execute();
 * @since 1.0.6
73
 */
S
StoneT2000 已提交
74 75
TDengineCursor.prototype.query = function query(operation, execute = false) {
  return new TaosQuery(operation, this, execute);
76 77 78
}

/**
S
StoneT2000 已提交
79 80 81 82 83 84 85 86
 * Execute a query. Also stores all the field meta data returned from the query into cursor.fields. It is preferable to use cursor.query() to create
 * queries and execute them instead of using the cursor object directly.
 * @param {string} operation - The query operation to execute in the taos shell
 * @param {Object} options - Execution options object. quiet : true turns off logging from queries
 * @param {boolean} options.quiet - True if you want to surpress logging such as "Query OK, 1 row(s) ..."
 * @param {function} callback - A callback function to execute after the query is made to TDengine
 * @return {number | Buffer} Number of affected rows or a Buffer that points to the results of the query
 * @since 1.0.0
87
 */
S
StoneT2000 已提交
88
TDengineCursor.prototype.execute = function execute(operation, options, callback) {
89
  if (operation == undefined) {
90
    throw new errors.ProgrammingError('No operation passed as argument');
91 92
    return null;
  }
S
StoneT2000 已提交
93

94
  if (typeof options == 'function') {
S
StoneT2000 已提交
95 96 97
    callback = options;
  }
  if (typeof options != 'object') options = {}
98 99 100
  if (this._connection == null) {
    throw new errors.ProgrammingError('Cursor is not connected');
  }
101

102 103 104
  this._reset_result();

  let stmt = operation;
S
StoneT2000 已提交
105
  let time = 0;
106 107 108 109 110 111 112 113
  let res;
  if (options['quiet'] != true) {
    const obs = new PerformanceObserver((items) => {
      time = items.getEntries()[0].duration;
      performance.clearMarks();
    });
    obs.observe({ entryTypes: ['measure'] });
    performance.mark('A');
114
    this._result = this._chandle.query(this._connection._conn, stmt);
115 116 117 118
    performance.mark('B');
    performance.measure('query', 'A', 'B');
  }
  else {
119
    this._result = this._chandle.query(this._connection._conn, stmt);
120
  }
121
  res = this._chandle.errno(this._result);
122
  if (res == 0) {
123
    let fieldCount = this._chandle.fieldsCount(this._result);
124
    if (fieldCount == 0) {
125
      let affectedRowCount = this._chandle.affectedRows(this._result);
S
StoneT2000 已提交
126 127 128 129 130 131
      let response = this._createAffectedResponse(affectedRowCount, time)
      if (options['quiet'] != true) {
        console.log(response);
      }
      wrapCB(callback);
      return affectedRowCount; //return num of affected rows, common with insert, use statements
132 133
    }
    else {
134 135
      this._fields = this._chandle.useResult(this._result);
      this.fields = this._fields;
S
StoneT2000 已提交
136
      wrapCB(callback);
137

138
      return this._result; //return a pointer to the result
139 140 141
    }
  }
  else {
142
    throw new errors.ProgrammingError(this._chandle.errStr(this._result))
143 144 145
  }

}
S
StoneT2000 已提交
146
TDengineCursor.prototype._createAffectedResponse = function (num, time) {
147
  return "Query OK, " + num + " row(s) affected (" + (time * 0.001).toFixed(8) + "s)";
S
StoneT2000 已提交
148 149
}
TDengineCursor.prototype._createSetResponse = function (num, time) {
150
  return "Query OK, " + num + " row(s) in set (" + (time * 0.001).toFixed(8) + "s)";
S
StoneT2000 已提交
151
}
152 153 154 155 156 157 158 159 160
TDengineCursor.prototype.executemany = function executemany() {

}
TDengineCursor.prototype.fetchone = function fetchone() {

}
TDengineCursor.prototype.fetchmany = function fetchmany() {

}
161
/**
S
StoneT2000 已提交
162 163 164 165
 * Fetches all results from a query and also stores results into cursor.data. It is preferable to use cursor.query() to create
 * queries and execute them instead of using the cursor object directly.
 * @param {function} callback - callback function executing on the complete fetched data
 * @return {Array<Array>} The resultant array, with entries corresponding to each retreived row from the query results, sorted in
166
 * order by the field name ordering in the table.
S
StoneT2000 已提交
167
 * @since 1.0.0
168 169
 * @example
 * cursor.execute('select * from db.table');
S
StoneT2000 已提交
170 171 172
 * var data = cursor.fetchall(function(results) {
 *   results.forEach(row => console.log(row));
 * })
173
 */
S
StoneT2000 已提交
174
TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
175
  if (this._result == null || this._fields == null) {
S
StoneT2000 已提交
176
    throw new errors.OperationalError("Invalid use of fetchall, either result or fields from query are null. First execute a query first");
177
  }
S
StoneT2000 已提交
178

179 180 181
  let num_of_rows = this._chandle.affectedRows(this._result);
  let data = new Array(num_of_rows);

182
  this._rowcount = 0;
183

S
StoneT2000 已提交
184 185 186 187 188
  let time = 0;
  const obs = new PerformanceObserver((items) => {
    time += items.getEntries()[0].duration;
    performance.clearMarks();
  });
189 190
  obs.observe({ entryTypes: ['measure'] });
  performance.mark('A');
191
  while (true) {
192
    let blockAndRows = this._chandle.fetchBlock(this._result, this._fields);
193 194
    // console.log(blockAndRows);
    // break;
195 196 197 198 199 200
    let block = blockAndRows.blocks;
    let num_of_rows = blockAndRows.num_of_rows;
    if (num_of_rows == 0) {
      break;
    }
    this._rowcount += num_of_rows;
L
liu0x54 已提交
201
    let numoffields = this._fields.length;
202
    for (let i = 0; i < num_of_rows; i++) {
203 204
      // data.push([]);

L
liu0x54 已提交
205 206
      let rowBlock = new Array(numoffields);
      for (let j = 0; j < numoffields; j++) {
S
StoneT2000 已提交
207
        rowBlock[j] = block[j][i];
208
      }
209 210
      data[this._rowcount - num_of_rows + i] = (rowBlock);
      // data.push(rowBlock);
211
    }
212

213
  }
214
  
215 216
  performance.mark('B');
  performance.measure('query', 'A', 'B');
S
StoneT2000 已提交
217 218 219
  let response = this._createSetResponse(this._rowcount, time)
  console.log(response);

220
  // this._connection._clearResultSet();
S
StoneT2000 已提交
221 222
  let fields = this.fields;
  this._reset_result();
223
  this.data = data;
S
StoneT2000 已提交
224
  this.fields = fields;
225

S
StoneT2000 已提交
226 227
  wrapCB(callback, data);

228
  return data;
229
}
230 231 232 233 234 235 236 237 238
/**
 * Asynchrnously execute a query to TDengine. NOTE, insertion requests must be done in sync if on the same table.
 * @param {string} operation - The query operation to execute in the taos shell
 * @param {Object} options - Execution options object. quiet : true turns off logging from queries
 * @param {boolean} options.quiet - True if you want to surpress logging such as "Query OK, 1 row(s) ..."
 * @param {function} callback - A callback function to execute after the query is made to TDengine
 * @return {number | Buffer} Number of affected rows or a Buffer that points to the results of the query
 * @since 1.0.0
 */
239
TDengineCursor.prototype.execute_a = function execute_a(operation, options, callback, param) {
240 241 242 243
  if (operation == undefined) {
    throw new errors.ProgrammingError('No operation passed as argument');
    return null;
  }
244
  if (typeof options == 'function') {
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
    //we expect the parameter after callback to be param
    param = callback;
    callback = options;
  }
  if (typeof options != 'object') options = {}
  if (this._connection == null) {
    throw new errors.ProgrammingError('Cursor is not connected');
  }
  if (typeof callback != 'function') {
    throw new errors.ProgrammingError("No callback function passed to execute_a function");
  }
  // Async wrapper for callback;
  var cr = this;

  let asyncCallbackWrapper = function (param2, res2, resCode) {
    if (typeof callback == 'function') {
      callback(param2, res2, resCode);
    }

    if (resCode >= 0) {
265 266 267 268 269 270 271 272
      //      let fieldCount = cr._chandle.numFields(res2);
      //      if (fieldCount == 0) {
      //        //cr._chandle.freeResult(res2);
      //        return res2;
      //      } 
      //      else {
      //        return res2;
      //      }
L
liu0x54 已提交
273
      return res2;
274 275 276 277 278 279

    }
    else {
      throw new errors.ProgrammingError("Error occuring with use of execute_a async function. Status code was returned with failure");
    }
  }
280

281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
  let stmt = operation;
  let time = 0;

  // Use ref module to write to buffer in cursor.js instead of taosquery to maintain a difference in levels. Have taosquery stay high level
  // through letting it pass an object as param
  var buf = ref.alloc('Object');
  ref.writeObject(buf, 0, param);
  const obs = new PerformanceObserver((items) => {
    time = items.getEntries()[0].duration;
    performance.clearMarks();
  });
  obs.observe({ entryTypes: ['measure'] });
  performance.mark('A');
  this._chandle.query_a(this._connection._conn, stmt, asyncCallbackWrapper, buf);
  performance.mark('B');
  performance.measure('query', 'A', 'B');
  return param;


}
/**
 * Fetches all results from an async query. It is preferable to use cursor.query_a() to create
 * async queries and execute them instead of using the cursor object directly.
 * @param {Object} options - An options object containing options for this function
 * @param {function} callback - callback function that is callbacked on the COMPLETE fetched data (it is calledback only once!).
 * Must be of form function (param, result, rowCount, rowData)
 * @param {Object} param - A parameter that is also passed to the main callback function. Important! Param must be an object, and the key "data" cannot be used
308
 * @return {{param:Object, result:Buffer}} An object with the passed parameters object and the buffer instance that is a pointer to the result handle.
309 310 311 312 313 314 315 316
 * @since 1.2.0
 * @example
 * cursor.execute('select * from db.table');
 * var data = cursor.fetchall(function(results) {
 *   results.forEach(row => console.log(row));
 * })
 */
TDengineCursor.prototype.fetchall_a = function fetchall_a(result, options, callback, param = {}) {
317
  if (typeof options == 'function') {
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
    //we expect the parameter after callback to be param
    param = callback;
    callback = options;
  }
  if (typeof options != 'object') options = {}
  if (this._connection == null) {
    throw new errors.ProgrammingError('Cursor is not connected');
  }
  if (typeof callback != 'function') {
    throw new errors.ProgrammingError('No callback function passed to fetchall_a function')
  }
  if (param.data) {
    throw new errors.ProgrammingError("You aren't allowed to set the key 'data' for the parameters object");
  }
  let buf = ref.alloc('Object');
  param.data = [];
  var cr = this;

  // This callback wrapper accumulates the data from the fetch_rows_a function from the cinterface. It is accumulated by passing the param2
  // object which holds accumulated data in the data key.
  let asyncCallbackWrapper = function asyncCallbackWrapper(param2, result2, numOfRows2, rowData) {
    param2 = ref.readObject(param2); //return the object back from the pointer
340 341
    if (numOfRows2 > 0 && rowData.length != 0) {
      // Keep fetching until now rows left.
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
      let buf2 = ref.alloc('Object');
      param2.data.push(rowData);
      ref.writeObject(buf2, 0, param2);
      cr._chandle.fetch_rows_a(result2, asyncCallbackWrapper, buf2);
    }
    else {
      let finalData = param2.data;
      let fields = cr._chandle.fetchFields_a(result2);
      let data = [];
      for (let i = 0; i < finalData.length; i++) {
        let num_of_rows = finalData[i][0].length; //fetched block number i;
        let block = finalData[i];
        for (let j = 0; j < num_of_rows; j++) {
          data.push([]);
          let rowBlock = new Array(fields.length);
          for (let k = 0; k < fields.length; k++) {
            rowBlock[k] = block[k][j];
          }
360
          data[data.length - 1] = rowBlock;
361 362 363
        }
      }
      cr._chandle.freeResult(result2); // free result, avoid seg faults and mem leaks!
364
      callback(param2, result2, numOfRows2, { data: data, fields: fields });
365

366 367 368 369
    }
  }
  ref.writeObject(buf, 0, param);
  param = this._chandle.fetch_rows_a(result, asyncCallbackWrapper, buf); //returned param
370
  return { param: param, result: result };
371
}
372 373 374 375 376 377 378
/**
 * Stop a query given the result handle.
 * @param {Buffer} result - The buffer that acts as the result handle
 * @since 1.3.0
 */
TDengineCursor.prototype.stopQuery = function stopQuery(result) {
  this._chandle.stopQuery(result);
379 380 381
}
TDengineCursor.prototype._reset_result = function _reset_result() {
  this._rowcount = -1;
382 383 384
  if (this._result != null) {
    this._chandle.freeResult(this._result);
  }
385 386
  this._result = null;
  this._fields = null;
387
  this.data = [];
S
StoneT2000 已提交
388
  this.fields = null;
389
}
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
/**
 * Get server info such as version number
 * @return {string}
 * @since 1.3.0
 */
TDengineCursor.prototype.getServerInfo = function getServerInfo() {
  return this._chandle.getServerInfo(this._connection._conn);
}
/**
 * Get client info such as version number
 * @return {string}
 * @since 1.3.0
 */
TDengineCursor.prototype.getClientInfo = function getClientInfo() {
  return this._chandle.getClientInfo();
}
/**
 * Subscribe to a table from a database in TDengine.
 * @param {Object} config - A configuration object containing the configuration options for the subscription
S
StoneT2000 已提交
409 410 411 412
 * @param {string} config.restart - whether or not to continue a subscription if it already exits, otherwise start from beginning
 * @param {string} config.topic - The unique identifier of a subscription
 * @param {string} config.sql - A sql statement for data query
 * @param {string} config.interval - The pulling interval
413 414 415 416
 * @return {Buffer} A buffer pointing to the subscription session handle
 * @since 1.3.0
 */
TDengineCursor.prototype.subscribe = function subscribe(config) {
S
StoneT2000 已提交
417 418
  let restart = config.restart ? 1 : 0;
  return this._chandle.subscribe(this._connection._conn, restart, config.topic, config.sql, config.interval);
419 420 421 422 423 424 425 426 427
};
/**
 * An infinite loop that consumes the latest data and calls a callback function that is provided.
 * @param {Buffer} subscription - A buffer object pointing to the subscription session handle
 * @param {function} callback - The callback function that takes the row data, field/column meta data, and the subscription session handle as input
 * @since 1.3.0
 */
TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) {
  while (true) {
428
    let { data, fields, result } = this._chandle.consume(subscription);
S
StoneT2000 已提交
429
    callback(data, fields, result);
430 431
  }
}
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
/**
 * Unsubscribe the provided buffer object pointing to the subscription session handle
 * @param {Buffer} subscription - A buffer object pointing to the subscription session handle that is to be unsubscribed
 * @since 1.3.0
 */
TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) {
  this._chandle.unsubscribe(subscription);
}
/**
 * Open a stream with TDengine to run the sql query periodically in the background
 * @param {string} sql - The query to run
 * @param {function} callback - The callback function to run after each query, accepting inputs as param, result handle, data, fields meta data
 * @param {number} stime - The time of the stream starts in the form of epoch milliseconds. If 0 is given, the start time is set as the current time.
 * @param {function} stoppingCallback - The callback function to run when the continuous query stops. It takes no inputs
 * @param {object} param - A parameter that is passed to the main callback function
 * @return {Buffer} A buffer pointing to the stream handle
 * @since 1.3.0
 */
450 451 452
TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) {
  let buf = ref.alloc('Object');
  ref.writeObject(buf, 0, param);
453

454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
  let asyncCallbackWrapper = function (param2, result2, blocks, fields) {
    let data = [];
    let num_of_rows = blocks[0].length;
    for (let j = 0; j < num_of_rows; j++) {
      data.push([]);
      let rowBlock = new Array(fields.length);
      for (let k = 0; k < fields.length; k++) {
        rowBlock[k] = blocks[k][j];
      }
      data[data.length - 1] = rowBlock;
    }
    callback(param2, result2, blocks, fields);
  }
  return this._chandle.openStream(this._connection._conn, sql, asyncCallbackWrapper, stime, stoppingCallback, buf);
}
/**
 * Close a stream
 * @param {Buffer} - A buffer pointing to the handle of the stream to be closed
 * @since 1.3.0
 */
TDengineCursor.prototype.closeStream = function closeStream(stream) {
  this._chandle.closeStream(stream);
}