cursor.js 17.9 KB
Newer Older
1
const ref = require('ref');
S
StoneT2000 已提交
2
require('./globalfunc.js')
3 4
const CTaosInterface = require('./cinterface')
const errors = require ('./error')
5
const TaosQuery = require('./taosquery')
S
StoneT2000 已提交
6
const { PerformanceObserver, performance } = require('perf_hooks');
7 8
module.exports = TDengineCursor;

9
/**
10
 * @typedef {Object} Buffer - A Node.js buffer. Please refer to {@link https://nodejs.org/api/buffer.html} for more details
S
StoneT2000 已提交
11 12 13 14
 * @global
 */

/**
15
 * @class TDengineCursor
S
StoneT2000 已提交
16 17 18 19
 * @classdesc  The TDengine Cursor works directly with the C Interface which works with TDengine. It refrains from
 * returning parsed data and majority of functions return the raw data such as cursor.fetchall() as compared to the TaosQuery class which
 * has functions that "prettify" the data and add more functionality and can be used through cursor.query("your query"). Instead of
 * promises, the class and its functions use callbacks.
20 21
 * @param {TDengineConnection} - The TDengine Connection this cursor uses to interact with TDengine
 * @property {data} - Latest retrieved data from query execution. It is an empty array by default
S
StoneT2000 已提交
22 23
 * @property {fields} - Array of the field objects in order from left to right of the latest data retrieved
 * @since 1.0.0
24
 */
25
function TDengineCursor(connection=null) {
26
  //All parameters are store for sync queries only.
27 28 29 30
  this._rowcount = -1;
  this._connection = null;
  this._result = null;
  this._fields = null;
31
  this.data = [];
S
StoneT2000 已提交
32
  this.fields = null;
33 34
  if (connection != null) {
    this._connection = connection
35 36 37 38
    this._chandle = connection._chandle //pass through, just need library loaded.
  }
  else {
    throw new errors.ProgrammingError("A TDengineConnection object is required to be passed to the TDengineCursor");
39 40 41
  }

}
42 43
/**
 * Get the row counts of the latest query
S
StoneT2000 已提交
44
 * @since 1.0.0
45 46
 * @return {number} Rowcount
 */
47 48 49
TDengineCursor.prototype.rowcount = function rowcount() {
  return this._rowcount;
}
50 51 52
/**
 * Close the cursor by setting its connection to null and freeing results from the connection and resetting the results it has stored
 * @return {boolean} Whether or not the cursor was succesfully closed
S
StoneT2000 已提交
53
 * @since 1.0.0
54
 */
55 56 57 58
TDengineCursor.prototype.close = function close() {
  if (this._connection == null) {
    return false;
  }
59
  this._connection._clearResultSet();
60 61 62 63
  this._reset_result();
  this._connection = null;
  return true;
}
64
/**
S
StoneT2000 已提交
65 66 67 68 69 70 71 72
 * Create a TaosQuery object to perform a query to TDengine and retrieve data.
 * @param {string} operation - The operation string to perform a query on
 * @param {boolean} execute - Whether or not to immedietely perform the query. Default is false.
 * @return {TaosQuery | Promise<TaosResult>} A TaosQuery object
 * @example
 * var query = cursor.query("select count(*) from meterinfo.meters");
 * query.execute();
 * @since 1.0.6
73
 */
S
StoneT2000 已提交
74 75
TDengineCursor.prototype.query = function query(operation, execute = false) {
  return new TaosQuery(operation, this, execute);
76 77 78
}

/**
S
StoneT2000 已提交
79 80 81 82 83 84 85 86
 * Execute a query. Also stores all the field meta data returned from the query into cursor.fields. It is preferable to use cursor.query() to create
 * queries and execute them instead of using the cursor object directly.
 * @param {string} operation - The query operation to execute in the taos shell
 * @param {Object} options - Execution options object. quiet : true turns off logging from queries
 * @param {boolean} options.quiet - True if you want to surpress logging such as "Query OK, 1 row(s) ..."
 * @param {function} callback - A callback function to execute after the query is made to TDengine
 * @return {number | Buffer} Number of affected rows or a Buffer that points to the results of the query
 * @since 1.0.0
87
 */
S
StoneT2000 已提交
88
TDengineCursor.prototype.execute = function execute(operation, options, callback) {
89
  if (operation == undefined) {
90
    throw new errors.ProgrammingError('No operation passed as argument');
91 92
    return null;
  }
S
StoneT2000 已提交
93 94 95 96 97

  if (typeof options == 'function')  {
    callback = options;
  }
  if (typeof options != 'object') options = {}
98 99 100
  if (this._connection == null) {
    throw new errors.ProgrammingError('Cursor is not connected');
  }
101

102 103 104
  this._reset_result();

  let stmt = operation;
S
StoneT2000 已提交
105
  let time = 0;
106 107 108 109 110 111 112 113
  let res;
  if (options['quiet'] != true) {
    const obs = new PerformanceObserver((items) => {
      time = items.getEntries()[0].duration;
      performance.clearMarks();
    });
    obs.observe({ entryTypes: ['measure'] });
    performance.mark('A');
114
    this._result = this._chandle.query(this._connection._conn, stmt);
115 116 117 118
    performance.mark('B');
    performance.measure('query', 'A', 'B');
  }
  else {
119
    this._result = this._chandle.query(this._connection._conn, stmt);
120
  }
121
  res = this._chandle.errno(this._result);
122
  if (res == 0) {
123
    let fieldCount = this._chandle.fieldsCount(this._result);
124
    if (fieldCount == 0) {
125
      let affectedRowCount = this._chandle.affectedRows(this._result);
S
StoneT2000 已提交
126 127 128 129 130 131
      let response = this._createAffectedResponse(affectedRowCount, time)
      if (options['quiet'] != true) {
        console.log(response);
      }
      wrapCB(callback);
      return affectedRowCount; //return num of affected rows, common with insert, use statements
132 133
    }
    else {
134 135
      this._fields = this._chandle.useResult(this._result);
      this.fields = this._fields;
S
StoneT2000 已提交
136
      wrapCB(callback);
137

138
      return this._result; //return a pointer to the result
139 140 141
    }
  }
  else {
142
    throw new errors.ProgrammingError(this._chandle.errStr(this._result))
143 144 145
  }

}
S
StoneT2000 已提交
146 147 148 149 150 151
TDengineCursor.prototype._createAffectedResponse = function (num, time) {
  return "Query OK, " + num  + " row(s) affected (" + (time * 0.001).toFixed(8) + "s)";
}
TDengineCursor.prototype._createSetResponse = function (num, time) {
  return "Query OK, " + num  + " row(s) in set (" + (time * 0.001).toFixed(8) + "s)";
}
152 153 154 155 156 157 158 159 160
TDengineCursor.prototype.executemany = function executemany() {

}
TDengineCursor.prototype.fetchone = function fetchone() {

}
TDengineCursor.prototype.fetchmany = function fetchmany() {

}
161
/**
S
StoneT2000 已提交
162 163 164 165
 * Fetches all results from a query and also stores results into cursor.data. It is preferable to use cursor.query() to create
 * queries and execute them instead of using the cursor object directly.
 * @param {function} callback - callback function executing on the complete fetched data
 * @return {Array<Array>} The resultant array, with entries corresponding to each retreived row from the query results, sorted in
166
 * order by the field name ordering in the table.
S
StoneT2000 已提交
167
 * @since 1.0.0
168 169
 * @example
 * cursor.execute('select * from db.table');
S
StoneT2000 已提交
170 171 172
 * var data = cursor.fetchall(function(results) {
 *   results.forEach(row => console.log(row));
 * })
173
 */
S
StoneT2000 已提交
174
TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
175
  if (this._result == null || this._fields == null) {
S
StoneT2000 已提交
176
    throw new errors.OperationalError("Invalid use of fetchall, either result or fields from query are null. First execute a query first");
177
  }
S
StoneT2000 已提交
178

179 180
  let data = [];
  this._rowcount = 0;
S
StoneT2000 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194
  //let nodetime = 0;
  let time = 0;
  const obs = new PerformanceObserver((items) => {
    time += items.getEntries()[0].duration;
    performance.clearMarks();
  });
  /*
  const obs2 = new PerformanceObserver((items) => {
    nodetime += items.getEntries()[0].duration;
    performance.clearMarks();
  });
  obs2.observe({ entryTypes: ['measure'] });
  performance.mark('nodea');
  */
195 196
  obs.observe({ entryTypes: ['measure'] });
  performance.mark('A');
197
  while(true) {
198

199
    let blockAndRows = this._chandle.fetchBlock(this._result, this._fields);
200 201 202 203 204 205
    let block = blockAndRows.blocks;
    let num_of_rows = blockAndRows.num_of_rows;
    if (num_of_rows == 0) {
      break;
    }
    this._rowcount += num_of_rows;
L
liu0x54 已提交
206
    let numoffields = this._fields.length;
207 208
    for (let i = 0; i < num_of_rows; i++) {
      data.push([]);
L
liu0x54 已提交
209 210 211
      
      let rowBlock = new Array(numoffields);
      for (let j = 0; j < numoffields; j++) {
S
StoneT2000 已提交
212
        rowBlock[j] = block[j][i];
213
      }
S
StoneT2000 已提交
214
      data[data.length-1] = (rowBlock);
215
    }
216

217
  }
218 219
  performance.mark('B');
  performance.measure('query', 'A', 'B');
S
StoneT2000 已提交
220 221 222
  let response = this._createSetResponse(this._rowcount, time)
  console.log(response);

223
 // this._connection._clearResultSet();
S
StoneT2000 已提交
224 225
  let fields = this.fields;
  this._reset_result();
226
  this.data = data;
S
StoneT2000 已提交
227
  this.fields = fields;
228

S
StoneT2000 已提交
229 230
  wrapCB(callback, data);

231
  return data;
232
}
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
/**
 * Asynchrnously execute a query to TDengine. NOTE, insertion requests must be done in sync if on the same table.
 * @param {string} operation - The query operation to execute in the taos shell
 * @param {Object} options - Execution options object. quiet : true turns off logging from queries
 * @param {boolean} options.quiet - True if you want to surpress logging such as "Query OK, 1 row(s) ..."
 * @param {function} callback - A callback function to execute after the query is made to TDengine
 * @return {number | Buffer} Number of affected rows or a Buffer that points to the results of the query
 * @since 1.0.0
 */
TDengineCursor.prototype.execute_a = function execute_a (operation, options, callback, param) {
  if (operation == undefined) {
    throw new errors.ProgrammingError('No operation passed as argument');
    return null;
  }
  if (typeof options == 'function')  {
    //we expect the parameter after callback to be param
    param = callback;
    callback = options;
  }
  if (typeof options != 'object') options = {}
  if (this._connection == null) {
    throw new errors.ProgrammingError('Cursor is not connected');
  }
  if (typeof callback != 'function') {
    throw new errors.ProgrammingError("No callback function passed to execute_a function");
  }
  // Async wrapper for callback;
  var cr = this;

  let asyncCallbackWrapper = function (param2, res2, resCode) {
    if (typeof callback == 'function') {
      callback(param2, res2, resCode);
    }

    if (resCode >= 0) {
      let fieldCount = cr._chandle.numFields(res2);
      if (fieldCount == 0) {
270
        cr._chandle.freeResult(res2);
L
liu0x54 已提交
271
      } 
272 273 274 275 276 277 278 279 280
      else {
        return res2;
      }

    }
    else {
      throw new errors.ProgrammingError("Error occuring with use of execute_a async function. Status code was returned with failure");
    }
  }
281

282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
  let stmt = operation;
  let time = 0;

  // Use ref module to write to buffer in cursor.js instead of taosquery to maintain a difference in levels. Have taosquery stay high level
  // through letting it pass an object as param
  var buf = ref.alloc('Object');
  ref.writeObject(buf, 0, param);
  const obs = new PerformanceObserver((items) => {
    time = items.getEntries()[0].duration;
    performance.clearMarks();
  });
  obs.observe({ entryTypes: ['measure'] });
  performance.mark('A');
  this._chandle.query_a(this._connection._conn, stmt, asyncCallbackWrapper, buf);
  performance.mark('B');
  performance.measure('query', 'A', 'B');
  return param;


}
/**
 * Fetches all results from an async query. It is preferable to use cursor.query_a() to create
 * async queries and execute them instead of using the cursor object directly.
 * @param {Object} options - An options object containing options for this function
 * @param {function} callback - callback function that is callbacked on the COMPLETE fetched data (it is calledback only once!).
 * Must be of form function (param, result, rowCount, rowData)
 * @param {Object} param - A parameter that is also passed to the main callback function. Important! Param must be an object, and the key "data" cannot be used
309
 * @return {{param:Object, result:Buffer}} An object with the passed parameters object and the buffer instance that is a pointer to the result handle.
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
 * @since 1.2.0
 * @example
 * cursor.execute('select * from db.table');
 * var data = cursor.fetchall(function(results) {
 *   results.forEach(row => console.log(row));
 * })
 */
TDengineCursor.prototype.fetchall_a = function fetchall_a(result, options, callback, param = {}) {
  if (typeof options == 'function')  {
    //we expect the parameter after callback to be param
    param = callback;
    callback = options;
  }
  if (typeof options != 'object') options = {}
  if (this._connection == null) {
    throw new errors.ProgrammingError('Cursor is not connected');
  }
  if (typeof callback != 'function') {
    throw new errors.ProgrammingError('No callback function passed to fetchall_a function')
  }
  if (param.data) {
    throw new errors.ProgrammingError("You aren't allowed to set the key 'data' for the parameters object");
  }
  let buf = ref.alloc('Object');
  param.data = [];
  var cr = this;

  // This callback wrapper accumulates the data from the fetch_rows_a function from the cinterface. It is accumulated by passing the param2
  // object which holds accumulated data in the data key.
  let asyncCallbackWrapper = function asyncCallbackWrapper(param2, result2, numOfRows2, rowData) {
    param2 = ref.readObject(param2); //return the object back from the pointer
341 342
    if (numOfRows2 > 0 && rowData.length != 0) {
      // Keep fetching until now rows left.
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
      let buf2 = ref.alloc('Object');
      param2.data.push(rowData);
      ref.writeObject(buf2, 0, param2);
      cr._chandle.fetch_rows_a(result2, asyncCallbackWrapper, buf2);
    }
    else {
      let finalData = param2.data;
      let fields = cr._chandle.fetchFields_a(result2);
      let data = [];
      for (let i = 0; i < finalData.length; i++) {
        let num_of_rows = finalData[i][0].length; //fetched block number i;
        let block = finalData[i];
        for (let j = 0; j < num_of_rows; j++) {
          data.push([]);
          let rowBlock = new Array(fields.length);
          for (let k = 0; k < fields.length; k++) {
            rowBlock[k] = block[k][j];
          }
          data[data.length-1] = rowBlock;
        }
      }
      cr._chandle.freeResult(result2); // free result, avoid seg faults and mem leaks!
      callback(param2, result2, numOfRows2, {data:data,fields:fields});
366

367 368 369 370 371 372
    }
  }
  ref.writeObject(buf, 0, param);
  param = this._chandle.fetch_rows_a(result, asyncCallbackWrapper, buf); //returned param
  return {param:param,result:result};
}
373 374 375 376 377 378 379
/**
 * Stop a query given the result handle.
 * @param {Buffer} result - The buffer that acts as the result handle
 * @since 1.3.0
 */
TDengineCursor.prototype.stopQuery = function stopQuery(result) {
  this._chandle.stopQuery(result);
380 381 382
}
TDengineCursor.prototype._reset_result = function _reset_result() {
  this._rowcount = -1;
383 384 385
  if (this._result != null) {
    this._chandle.freeResult(this._result);
  }
386 387
  this._result = null;
  this._fields = null;
388
  this.data = [];
S
StoneT2000 已提交
389
  this.fields = null;
390
}
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
/**
 * Get server info such as version number
 * @return {string}
 * @since 1.3.0
 */
TDengineCursor.prototype.getServerInfo = function getServerInfo() {
  return this._chandle.getServerInfo(this._connection._conn);
}
/**
 * Get client info such as version number
 * @return {string}
 * @since 1.3.0
 */
TDengineCursor.prototype.getClientInfo = function getClientInfo() {
  return this._chandle.getClientInfo();
}
/**
 * Subscribe to a table from a database in TDengine.
 * @param {Object} config - A configuration object containing the configuration options for the subscription
S
StoneT2000 已提交
410 411 412 413
 * @param {string} config.restart - whether or not to continue a subscription if it already exits, otherwise start from beginning
 * @param {string} config.topic - The unique identifier of a subscription
 * @param {string} config.sql - A sql statement for data query
 * @param {string} config.interval - The pulling interval
414 415 416 417
 * @return {Buffer} A buffer pointing to the subscription session handle
 * @since 1.3.0
 */
TDengineCursor.prototype.subscribe = function subscribe(config) {
S
StoneT2000 已提交
418 419
  let restart = config.restart ? 1 : 0;
  return this._chandle.subscribe(this._connection._conn, restart, config.topic, config.sql, config.interval);
420 421 422 423 424 425 426 427 428
};
/**
 * An infinite loop that consumes the latest data and calls a callback function that is provided.
 * @param {Buffer} subscription - A buffer object pointing to the subscription session handle
 * @param {function} callback - The callback function that takes the row data, field/column meta data, and the subscription session handle as input
 * @since 1.3.0
 */
TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) {
  while (true) {
S
StoneT2000 已提交
429 430
    let { data, fields, result} = this._chandle.consume(subscription);
    callback(data, fields, result);
431 432
  }
}
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
/**
 * Unsubscribe the provided buffer object pointing to the subscription session handle
 * @param {Buffer} subscription - A buffer object pointing to the subscription session handle that is to be unsubscribed
 * @since 1.3.0
 */
TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) {
  this._chandle.unsubscribe(subscription);
}
/**
 * Open a stream with TDengine to run the sql query periodically in the background
 * @param {string} sql - The query to run
 * @param {function} callback - The callback function to run after each query, accepting inputs as param, result handle, data, fields meta data
 * @param {number} stime - The time of the stream starts in the form of epoch milliseconds. If 0 is given, the start time is set as the current time.
 * @param {function} stoppingCallback - The callback function to run when the continuous query stops. It takes no inputs
 * @param {object} param - A parameter that is passed to the main callback function
 * @return {Buffer} A buffer pointing to the stream handle
 * @since 1.3.0
 */
 TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) {
   let buf = ref.alloc('Object');
   ref.writeObject(buf, 0, param);

   let asyncCallbackWrapper = function (param2, result2, blocks, fields) {
     let data = [];
     let num_of_rows = blocks[0].length;
     for (let j = 0; j < num_of_rows; j++) {
       data.push([]);
       let rowBlock = new Array(fields.length);
       for (let k = 0; k < fields.length; k++) {
         rowBlock[k] = blocks[k][j];
       }
       data[data.length-1] = rowBlock;
     }
     callback(param2, result2, blocks, fields);
   }
   return this._chandle.openStream(this._connection._conn, sql, asyncCallbackWrapper, stime, stoppingCallback, buf);
 }
 /**
  * Close a stream
  * @param {Buffer} - A buffer pointing to the handle of the stream to be closed
  * @since 1.3.0
  */
 TDengineCursor.prototype.closeStream = function closeStream(stream) {
   this._chandle.closeStream(stream);
 }