“ed433a3c930b2106a65d45c5bf292c3b52fdc266”上不存在“git@gitcode.net:hanbins137/rocketmq.git”
cursor.js 17.9 KB
Newer Older
1
const ref = require('ref');
S
StoneT2000 已提交
2
require('./globalfunc.js')
3 4
const CTaosInterface = require('./cinterface')
const errors = require ('./error')
5
const TaosQuery = require('./taosquery')
S
StoneT2000 已提交
6
const { PerformanceObserver, performance } = require('perf_hooks');
7 8
module.exports = TDengineCursor;

9
/**
10
 * @typedef {Object} Buffer - A Node.js buffer. Please refer to {@link https://nodejs.org/api/buffer.html} for more details
S
StoneT2000 已提交
11 12 13 14
 * @global
 */

/**
15
 * @class TDengineCursor
S
StoneT2000 已提交
16 17 18 19
 * @classdesc  The TDengine Cursor works directly with the C Interface which works with TDengine. It refrains from
 * returning parsed data and majority of functions return the raw data such as cursor.fetchall() as compared to the TaosQuery class which
 * has functions that "prettify" the data and add more functionality and can be used through cursor.query("your query"). Instead of
 * promises, the class and its functions use callbacks.
20 21
 * @param {TDengineConnection} - The TDengine Connection this cursor uses to interact with TDengine
 * @property {data} - Latest retrieved data from query execution. It is an empty array by default
S
StoneT2000 已提交
22 23
 * @property {fields} - Array of the field objects in order from left to right of the latest data retrieved
 * @since 1.0.0
24
 */
25
function TDengineCursor(connection=null) {
26
  //All parameters are store for sync queries only.
27 28 29 30
  this._rowcount = -1;
  this._connection = null;
  this._result = null;
  this._fields = null;
31
  this.data = [];
S
StoneT2000 已提交
32
  this.fields = null;
33 34
  if (connection != null) {
    this._connection = connection
35 36 37 38
    this._chandle = connection._chandle //pass through, just need library loaded.
  }
  else {
    throw new errors.ProgrammingError("A TDengineConnection object is required to be passed to the TDengineCursor");
39 40 41
  }

}
42 43
/**
 * Get the row counts of the latest query
S
StoneT2000 已提交
44
 * @since 1.0.0
45 46
 * @return {number} Rowcount
 */
47 48 49
TDengineCursor.prototype.rowcount = function rowcount() {
  return this._rowcount;
}
50 51 52
/**
 * Close the cursor by setting its connection to null and freeing results from the connection and resetting the results it has stored
 * @return {boolean} Whether or not the cursor was succesfully closed
S
StoneT2000 已提交
53
 * @since 1.0.0
54
 */
55 56 57 58
TDengineCursor.prototype.close = function close() {
  if (this._connection == null) {
    return false;
  }
59
  this._connection._clearResultSet();
60 61 62 63
  this._reset_result();
  this._connection = null;
  return true;
}
64
/**
S
StoneT2000 已提交
65 66 67 68 69 70 71 72
 * Create a TaosQuery object to perform a query to TDengine and retrieve data.
 * @param {string} operation - The operation string to perform a query on
 * @param {boolean} execute - Whether or not to immedietely perform the query. Default is false.
 * @return {TaosQuery | Promise<TaosResult>} A TaosQuery object
 * @example
 * var query = cursor.query("select count(*) from meterinfo.meters");
 * query.execute();
 * @since 1.0.6
73
 */
S
StoneT2000 已提交
74 75
TDengineCursor.prototype.query = function query(operation, execute = false) {
  return new TaosQuery(operation, this, execute);
76 77 78
}

/**
S
StoneT2000 已提交
79 80 81 82 83 84 85 86
 * Execute a query. Also stores all the field meta data returned from the query into cursor.fields. It is preferable to use cursor.query() to create
 * queries and execute them instead of using the cursor object directly.
 * @param {string} operation - The query operation to execute in the taos shell
 * @param {Object} options - Execution options object. quiet : true turns off logging from queries
 * @param {boolean} options.quiet - True if you want to surpress logging such as "Query OK, 1 row(s) ..."
 * @param {function} callback - A callback function to execute after the query is made to TDengine
 * @return {number | Buffer} Number of affected rows or a Buffer that points to the results of the query
 * @since 1.0.0
87
 */
S
StoneT2000 已提交
88
TDengineCursor.prototype.execute = function execute(operation, options, callback) {
89
  if (operation == undefined) {
90
    throw new errors.ProgrammingError('No operation passed as argument');
91 92
    return null;
  }
S
StoneT2000 已提交
93 94 95 96 97

  if (typeof options == 'function')  {
    callback = options;
  }
  if (typeof options != 'object') options = {}
98 99 100
  if (this._connection == null) {
    throw new errors.ProgrammingError('Cursor is not connected');
  }
101
  this._connection._clearResultSet();
102 103 104
  this._reset_result();

  let stmt = operation;
S
StoneT2000 已提交
105
  let time = 0;
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  let res;
  if (options['quiet'] != true) {
    const obs = new PerformanceObserver((items) => {
      time = items.getEntries()[0].duration;
      performance.clearMarks();
    });
    obs.observe({ entryTypes: ['measure'] });
    performance.mark('A');
    res = this._chandle.query(this._connection._conn, stmt);
    performance.mark('B');
    performance.measure('query', 'A', 'B');
  }
  else {
    res = this._chandle.query(this._connection._conn, stmt);
  }
S
StoneT2000 已提交
121

122
  if (res == 0) {
123
    let fieldCount = this._chandle.fieldsCount(this._connection._conn);
124
    if (fieldCount == 0) {
S
StoneT2000 已提交
125 126 127 128 129 130 131
      let affectedRowCount = this._chandle.affectedRows(this._connection._conn);
      let response = this._createAffectedResponse(affectedRowCount, time)
      if (options['quiet'] != true) {
        console.log(response);
      }
      wrapCB(callback);
      return affectedRowCount; //return num of affected rows, common with insert, use statements
132 133
    }
    else {
134
      let resAndField = this._chandle.useResult(this._connection._conn, fieldCount)
135 136
      this._result = resAndField.result;
      this._fields = resAndField.fields;
S
StoneT2000 已提交
137 138
      this.fields = resAndField.fields;
      wrapCB(callback);
139
      return this._result; //return a pointer to the result
140 141 142
    }
  }
  else {
143
    throw new errors.ProgrammingError(this._chandle.errStr(this._connection._conn))
144 145 146
  }

}
S
StoneT2000 已提交
147 148 149 150 151 152
TDengineCursor.prototype._createAffectedResponse = function (num, time) {
  return "Query OK, " + num  + " row(s) affected (" + (time * 0.001).toFixed(8) + "s)";
}
TDengineCursor.prototype._createSetResponse = function (num, time) {
  return "Query OK, " + num  + " row(s) in set (" + (time * 0.001).toFixed(8) + "s)";
}
153 154 155 156 157 158 159 160 161
TDengineCursor.prototype.executemany = function executemany() {

}
TDengineCursor.prototype.fetchone = function fetchone() {

}
TDengineCursor.prototype.fetchmany = function fetchmany() {

}
162
/**
S
StoneT2000 已提交
163 164 165 166
 * Fetches all results from a query and also stores results into cursor.data. It is preferable to use cursor.query() to create
 * queries and execute them instead of using the cursor object directly.
 * @param {function} callback - callback function executing on the complete fetched data
 * @return {Array<Array>} The resultant array, with entries corresponding to each retreived row from the query results, sorted in
167
 * order by the field name ordering in the table.
S
StoneT2000 已提交
168
 * @since 1.0.0
169 170
 * @example
 * cursor.execute('select * from db.table');
S
StoneT2000 已提交
171 172 173
 * var data = cursor.fetchall(function(results) {
 *   results.forEach(row => console.log(row));
 * })
174
 */
S
StoneT2000 已提交
175
TDengineCursor.prototype.fetchall = function fetchall(options, callback) {
176
  if (this._result == null || this._fields == null) {
S
StoneT2000 已提交
177
    throw new errors.OperationalError("Invalid use of fetchall, either result or fields from query are null. First execute a query first");
178
  }
S
StoneT2000 已提交
179

180 181
  let data = [];
  this._rowcount = 0;
S
StoneT2000 已提交
182 183 184 185 186 187 188 189 190 191 192 193 194 195
  //let nodetime = 0;
  let time = 0;
  const obs = new PerformanceObserver((items) => {
    time += items.getEntries()[0].duration;
    performance.clearMarks();
  });
  /*
  const obs2 = new PerformanceObserver((items) => {
    nodetime += items.getEntries()[0].duration;
    performance.clearMarks();
  });
  obs2.observe({ entryTypes: ['measure'] });
  performance.mark('nodea');
  */
196 197
  obs.observe({ entryTypes: ['measure'] });
  performance.mark('A');
198
  while(true) {
199

200
    let blockAndRows = this._chandle.fetchBlock(this._result, this._fields);
201

202 203 204 205 206 207 208 209 210
    let block = blockAndRows.blocks;
    let num_of_rows = blockAndRows.num_of_rows;

    if (num_of_rows == 0) {
      break;
    }
    this._rowcount += num_of_rows;
    for (let i = 0; i < num_of_rows; i++) {
      data.push([]);
S
StoneT2000 已提交
211
      let rowBlock = new Array(this._fields.length);
212
      for (let j = 0; j < this._fields.length; j++) {
S
StoneT2000 已提交
213
        rowBlock[j] = block[j][i];
214
      }
S
StoneT2000 已提交
215
      data[data.length-1] = (rowBlock);
216
    }
217

218
  }
219 220
  performance.mark('B');
  performance.measure('query', 'A', 'B');
S
StoneT2000 已提交
221 222 223
  let response = this._createSetResponse(this._rowcount, time)
  console.log(response);

224
  this._connection._clearResultSet();
S
StoneT2000 已提交
225 226
  let fields = this.fields;
  this._reset_result();
227
  this.data = data;
S
StoneT2000 已提交
228
  this.fields = fields;
229

S
StoneT2000 已提交
230 231
  wrapCB(callback, data);

232
  return data;
233
}
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
/**
 * Asynchrnously execute a query to TDengine. NOTE, insertion requests must be done in sync if on the same table.
 * @param {string} operation - The query operation to execute in the taos shell
 * @param {Object} options - Execution options object. quiet : true turns off logging from queries
 * @param {boolean} options.quiet - True if you want to surpress logging such as "Query OK, 1 row(s) ..."
 * @param {function} callback - A callback function to execute after the query is made to TDengine
 * @return {number | Buffer} Number of affected rows or a Buffer that points to the results of the query
 * @since 1.0.0
 */
TDengineCursor.prototype.execute_a = function execute_a (operation, options, callback, param) {
  if (operation == undefined) {
    throw new errors.ProgrammingError('No operation passed as argument');
    return null;
  }
  if (typeof options == 'function')  {
    //we expect the parameter after callback to be param
    param = callback;
    callback = options;
  }
  if (typeof options != 'object') options = {}
  if (this._connection == null) {
    throw new errors.ProgrammingError('Cursor is not connected');
  }
  if (typeof callback != 'function') {
    throw new errors.ProgrammingError("No callback function passed to execute_a function");
  }
  // Async wrapper for callback;
  var cr = this;

  let asyncCallbackWrapper = function (param2, res2, resCode) {
    if (typeof callback == 'function') {
      callback(param2, res2, resCode);
    }

    if (resCode >= 0) {
      let fieldCount = cr._chandle.numFields(res2);
      if (fieldCount == 0) {
271
        cr._chandle.freeResult(res2);
272 273 274 275 276 277 278 279 280 281
      }
      else {
        return res2;
      }

    }
    else {
      throw new errors.ProgrammingError("Error occuring with use of execute_a async function. Status code was returned with failure");
    }
  }
282

283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
  let stmt = operation;
  let time = 0;

  // Use ref module to write to buffer in cursor.js instead of taosquery to maintain a difference in levels. Have taosquery stay high level
  // through letting it pass an object as param
  var buf = ref.alloc('Object');
  ref.writeObject(buf, 0, param);
  const obs = new PerformanceObserver((items) => {
    time = items.getEntries()[0].duration;
    performance.clearMarks();
  });
  obs.observe({ entryTypes: ['measure'] });
  performance.mark('A');
  this._chandle.query_a(this._connection._conn, stmt, asyncCallbackWrapper, buf);
  performance.mark('B');
  performance.measure('query', 'A', 'B');
  return param;


}
/**
 * Fetches all results from an async query. It is preferable to use cursor.query_a() to create
 * async queries and execute them instead of using the cursor object directly.
 * @param {Object} options - An options object containing options for this function
 * @param {function} callback - callback function that is callbacked on the COMPLETE fetched data (it is calledback only once!).
 * Must be of form function (param, result, rowCount, rowData)
 * @param {Object} param - A parameter that is also passed to the main callback function. Important! Param must be an object, and the key "data" cannot be used
310
 * @return {{param:Object, result:Buffer}} An object with the passed parameters object and the buffer instance that is a pointer to the result handle.
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
 * @since 1.2.0
 * @example
 * cursor.execute('select * from db.table');
 * var data = cursor.fetchall(function(results) {
 *   results.forEach(row => console.log(row));
 * })
 */
TDengineCursor.prototype.fetchall_a = function fetchall_a(result, options, callback, param = {}) {
  if (typeof options == 'function')  {
    //we expect the parameter after callback to be param
    param = callback;
    callback = options;
  }
  if (typeof options != 'object') options = {}
  if (this._connection == null) {
    throw new errors.ProgrammingError('Cursor is not connected');
  }
  if (typeof callback != 'function') {
    throw new errors.ProgrammingError('No callback function passed to fetchall_a function')
  }
  if (param.data) {
    throw new errors.ProgrammingError("You aren't allowed to set the key 'data' for the parameters object");
  }
  let buf = ref.alloc('Object');
  param.data = [];
  var cr = this;

  // This callback wrapper accumulates the data from the fetch_rows_a function from the cinterface. It is accumulated by passing the param2
  // object which holds accumulated data in the data key.
  let asyncCallbackWrapper = function asyncCallbackWrapper(param2, result2, numOfRows2, rowData) {
    param2 = ref.readObject(param2); //return the object back from the pointer
342 343
    if (numOfRows2 > 0 && rowData.length != 0) {
      // Keep fetching until now rows left.
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
      let buf2 = ref.alloc('Object');
      param2.data.push(rowData);
      ref.writeObject(buf2, 0, param2);
      cr._chandle.fetch_rows_a(result2, asyncCallbackWrapper, buf2);
    }
    else {
      let finalData = param2.data;
      let fields = cr._chandle.fetchFields_a(result2);
      let data = [];
      for (let i = 0; i < finalData.length; i++) {
        let num_of_rows = finalData[i][0].length; //fetched block number i;
        let block = finalData[i];
        for (let j = 0; j < num_of_rows; j++) {
          data.push([]);
          let rowBlock = new Array(fields.length);
          for (let k = 0; k < fields.length; k++) {
            rowBlock[k] = block[k][j];
          }
          data[data.length-1] = rowBlock;
        }
      }
      cr._chandle.freeResult(result2); // free result, avoid seg faults and mem leaks!
      callback(param2, result2, numOfRows2, {data:data,fields:fields});
367

368 369 370 371 372 373
    }
  }
  ref.writeObject(buf, 0, param);
  param = this._chandle.fetch_rows_a(result, asyncCallbackWrapper, buf); //returned param
  return {param:param,result:result};
}
374 375 376 377 378 379 380
/**
 * Stop a query given the result handle.
 * @param {Buffer} result - The buffer that acts as the result handle
 * @since 1.3.0
 */
TDengineCursor.prototype.stopQuery = function stopQuery(result) {
  this._chandle.stopQuery(result);
381 382 383 384 385
}
TDengineCursor.prototype._reset_result = function _reset_result() {
  this._rowcount = -1;
  this._result = null;
  this._fields = null;
386
  this.data = [];
S
StoneT2000 已提交
387
  this.fields = null;
388
}
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
/**
 * Get server info such as version number
 * @return {string}
 * @since 1.3.0
 */
TDengineCursor.prototype.getServerInfo = function getServerInfo() {
  return this._chandle.getServerInfo(this._connection._conn);
}
/**
 * Get client info such as version number
 * @return {string}
 * @since 1.3.0
 */
TDengineCursor.prototype.getClientInfo = function getClientInfo() {
  return this._chandle.getClientInfo();
}
/**
 * Subscribe to a table from a database in TDengine.
 * @param {Object} config - A configuration object containing the configuration options for the subscription
S
StoneT2000 已提交
408 409 410 411
 * @param {string} config.restart - whether or not to continue a subscription if it already exits, otherwise start from beginning
 * @param {string} config.topic - The unique identifier of a subscription
 * @param {string} config.sql - A sql statement for data query
 * @param {string} config.interval - The pulling interval
412 413 414 415
 * @return {Buffer} A buffer pointing to the subscription session handle
 * @since 1.3.0
 */
TDengineCursor.prototype.subscribe = function subscribe(config) {
S
StoneT2000 已提交
416 417
  let restart = config.restart ? 1 : 0;
  return this._chandle.subscribe(this._connection._conn, restart, config.topic, config.sql, config.interval);
418 419 420 421 422 423 424 425 426
};
/**
 * An infinite loop that consumes the latest data and calls a callback function that is provided.
 * @param {Buffer} subscription - A buffer object pointing to the subscription session handle
 * @param {function} callback - The callback function that takes the row data, field/column meta data, and the subscription session handle as input
 * @since 1.3.0
 */
TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) {
  while (true) {
S
StoneT2000 已提交
427 428
    let { data, fields, result} = this._chandle.consume(subscription);
    callback(data, fields, result);
429 430
  }
}
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
/**
 * Unsubscribe the provided buffer object pointing to the subscription session handle
 * @param {Buffer} subscription - A buffer object pointing to the subscription session handle that is to be unsubscribed
 * @since 1.3.0
 */
TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) {
  this._chandle.unsubscribe(subscription);
}
/**
 * Open a stream with TDengine to run the sql query periodically in the background
 * @param {string} sql - The query to run
 * @param {function} callback - The callback function to run after each query, accepting inputs as param, result handle, data, fields meta data
 * @param {number} stime - The time of the stream starts in the form of epoch milliseconds. If 0 is given, the start time is set as the current time.
 * @param {function} stoppingCallback - The callback function to run when the continuous query stops. It takes no inputs
 * @param {object} param - A parameter that is passed to the main callback function
 * @return {Buffer} A buffer pointing to the stream handle
 * @since 1.3.0
 */
 TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) {
   let buf = ref.alloc('Object');
   ref.writeObject(buf, 0, param);

   let asyncCallbackWrapper = function (param2, result2, blocks, fields) {
     let data = [];
     let num_of_rows = blocks[0].length;
     for (let j = 0; j < num_of_rows; j++) {
       data.push([]);
       let rowBlock = new Array(fields.length);
       for (let k = 0; k < fields.length; k++) {
         rowBlock[k] = blocks[k][j];
       }
       data[data.length-1] = rowBlock;
     }
     callback(param2, result2, blocks, fields);
   }
   return this._chandle.openStream(this._connection._conn, sql, asyncCallbackWrapper, stime, stoppingCallback, buf);
 }
 /**
  * Close a stream
  * @param {Buffer} - A buffer pointing to the handle of the stream to be closed
  * @since 1.3.0
  */
 TDengineCursor.prototype.closeStream = function closeStream(stream) {
   this._chandle.closeStream(stream);
 }