diff --git a/src/connector/nodejs/nodetaos/cinterface.js b/src/connector/nodejs/nodetaos/cinterface.js index 0a81a0c79b21b2c2869e8e747df76e673c65b2eb..e04b55d9a5fd47fb6ecd50bb0ac360eefb907930 100644 --- a/src/connector/nodejs/nodetaos/cinterface.js +++ b/src/connector/nodejs/nodetaos/cinterface.js @@ -328,13 +328,6 @@ function CTaosInterface(config = null, pass = false) { //void taos_unsubscribe(TAOS_SUB *tsub); 'taos_unsubscribe': [ref.types.void, [ref.types.void_ptr]], - // Continuous Query - //TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), - // int64_t stime, void *param, void (*callback)(void *)); - 'taos_open_stream': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr]], - //void taos_close_stream(TAOS_STREAM *tstr); - 'taos_close_stream': [ref.types.void, [ref.types.void_ptr]], - //Schemaless insert //TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol,int precision) // 'taos_schemaless_insert': [ref.types.void_ptr, [ref.types.void_ptr, ref.types.char_ptr, ref.types.int, ref.types.int, ref.types.int]] @@ -717,51 +710,6 @@ CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) { this.libtaos.taos_unsubscribe(subscription); } -// Continuous Query -CTaosInterface.prototype.openStream = function openStream(connection, sql, callback, stime, stoppingCallback, param = ref.ref(ref.NULL)) { - try { - sql = ref.allocCString(sql); - } - catch (err) { - throw "Attribute Error: sql string is expected as a str"; - } - var cti = this; - let asyncCallbackWrapper = function (param2, result2, row) { - let fields = cti.fetchFields_a(result2); - let precision = cti.libtaos.taos_result_precision(result2); - let blocks = new Array(fields.length); - blocks.fill(null); - let numOfRows2 = 1; - let offset = 0; - if (numOfRows2 > 0) { - for (let i = 0; i < fields.length; i++) { - if (!convertFunctions[fields[i]['type']]) { - throw new errors.DatabaseError("Invalid data type returned from database"); - } - blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, precision); - offset += fields[i]['bytes'] * numOfRows2; - } - } - callback(param2, result2, blocks, fields); - } - asyncCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr, ref.types.void_ptr, ref.refType(ref.types.void_ptr2)], asyncCallbackWrapper); - asyncStoppingCallbackWrapper = ffi.Callback(ref.types.void, [ref.types.void_ptr], stoppingCallback); - let streamHandle = this.libtaos.taos_open_stream(connection, sql, asyncCallbackWrapper, stime, param, asyncStoppingCallbackWrapper); - if (ref.isNull(streamHandle)) { - throw new errors.TDError('Failed to open a stream with TDengine'); - return false; - } - else { - console.log("Succesfully opened stream"); - return streamHandle; - } -} - -CTaosInterface.prototype.closeStream = function closeStream(stream) { - this.libtaos.taos_close_stream(stream); - console.log("Closed stream"); -} - //Schemaless insert API /** * TAOS* taos, char* lines[], int numLines, int protocol,int precision) diff --git a/src/connector/nodejs/nodetaos/cursor.js b/src/connector/nodejs/nodetaos/cursor.js index 5969d4f344affa49ebbf81329729bff4733e116b..54431d9e98489481e3bdaa9433ef38177ce51b63 100644 --- a/src/connector/nodejs/nodetaos/cursor.js +++ b/src/connector/nodejs/nodetaos/cursor.js @@ -438,43 +438,7 @@ TDengineCursor.prototype.consumeData = async function consumeData(subscription, TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) { this._chandle.unsubscribe(subscription); } -/** - * Open a stream with TDengine to run the sql query periodically in the background - * @param {string} sql - The query to run - * @param {function} callback - The callback function to run after each query, accepting inputs as param, result handle, data, fields meta data - * @param {number} stime - The time of the stream starts in the form of epoch milliseconds. If 0 is given, the start time is set as the current time. - * @param {function} stoppingCallback - The callback function to run when the continuous query stops. It takes no inputs - * @param {object} param - A parameter that is passed to the main callback function - * @return {Buffer} A buffer pointing to the stream handle - * @since 1.3.0 - */ -TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) { - let buf = ref.alloc('Object'); - ref.writeObject(buf, 0, param); - let asyncCallbackWrapper = function (param2, result2, blocks, fields) { - let data = []; - let num_of_rows = blocks[0].length; - for (let j = 0; j < num_of_rows; j++) { - data.push([]); - let rowBlock = new Array(fields.length); - for (let k = 0; k < fields.length; k++) { - rowBlock[k] = blocks[k][j]; - } - data[data.length - 1] = rowBlock; - } - callback(param2, result2, blocks, fields); - } - return this._chandle.openStream(this._connection._conn, sql, asyncCallbackWrapper, stime, stoppingCallback, buf); -} -/** - * Close a stream - * @param {Buffer} - A buffer pointing to the handle of the stream to be closed - * @since 1.3.0 - */ -TDengineCursor.prototype.closeStream = function closeStream(stream) { - this._chandle.closeStream(stream); -} /** * schemaless insert * @param {*} connection a valid database connection