diff --git a/tests/examples/rust/README.md b/tests/examples/rust/README.md index 1693cd094eda16447b06b15232e76d7e6661f287..2ef8901ad6d30c0f33740e40662f1e9860dbb21a 100644 --- a/tests/examples/rust/README.md +++ b/tests/examples/rust/README.md @@ -10,7 +10,11 @@ curl https://sh.rustup.rs -sSf | sh ## Run with Sample -Build and run: +Build and run basic sample: ``` cargo run --example demo ``` +Build and run subscribe sample: +``` +cargo run --example subscribe +``` diff --git a/tests/examples/rust/examples/demo.rs b/tests/examples/rust/examples/demo.rs index b83db6f15d2a531b698dcbd00180d22f5da79399..182e46c8db4aa5ba68827309937320ef1e9b13e6 100644 --- a/tests/examples/rust/examples/demo.rs +++ b/tests/examples/rust/examples/demo.rs @@ -2,7 +2,8 @@ use std::process; use tdengine::Tdengine; fn main() { - let tde = Tdengine::new("127.0.0.1", "root", "taosdata", "", 0).unwrap_or_else(|err| { + let tde = Tdengine::new("127.0.0.1", "root", "taosdata", "demo", 0) + .unwrap_or_else(|err| { eprintln!("Can't create Tdengine: {}", err); process::exit(1) }); diff --git a/tests/examples/rust/examples/subscribe.rs b/tests/examples/rust/examples/subscribe.rs new file mode 100644 index 0000000000000000000000000000000000000000..3255e36ee7cdc9af3d9233ef7accfd6669b912e2 --- /dev/null +++ b/tests/examples/rust/examples/subscribe.rs @@ -0,0 +1,19 @@ +use std::process; +use tdengine::Subscriber; + +fn main() { + let subscriber = Subscriber::new("127.0.0.1", "root", "taosdata", "demo", "m1", 0, 1000) + .unwrap_or_else(|err| { + eprintln!("Can't create Subscriber: {}", err); + process::exit(1) + }); + + loop { + let row = subscriber.consume().unwrap_or_else(|err| { + eprintln!("consume exit: {}", err); + process::exit(1) + }); + + subscriber.print_row(&row); + } +} diff --git a/tests/examples/rust/src/lib.rs b/tests/examples/rust/src/lib.rs index 15997493f4fd1be47124eea226b921cb82fc83ff..fe7216dfd06519b50a2fe5fdb226a0b674f8850d 100644 --- a/tests/examples/rust/src/lib.rs +++ b/tests/examples/rust/src/lib.rs @@ -1,76 +1,10 @@ #![allow(unused)] #![allow(non_camel_case_types)] -use std::os::raw::c_void; -use std::os::raw::c_char; -use std::os::raw::c_int; -use std::ffi::CString; -use std::ffi::CStr; +pub mod subscriber; +pub use subscriber::*; -mod bindings; -use bindings::*; +pub mod tdengine; +pub use tdengine::*; -pub struct Tdengine { - conn: *mut c_void, -} - -/// - **TODO**: doc -impl Tdengine { - - //! - **TODO**: implement default param. - //! - //! > refer to https://stackoverflow.com/questions/24047686/default-function-arguments-in-rust - pub fn new(ip: &str, username: &str, passwd: &str, db: &str, port: i32) -> Result { - unsafe { - taos_init(); - let mut conn = taos_connect(str_into_raw(ip), - str_into_raw(username), - str_into_raw(passwd), - str_into_raw(db), - port as c_int); - if conn.is_null() { - Err("connect error") - } else { - println!("connected to {}:{} user:{}, db:{}", ip, port, username, db); - Ok(Tdengine {conn}) - } - } - } - - // - **TODO**: check error code - pub fn query(self: &Tdengine, s: &str) { - unsafe { - if taos_query(self.conn, str_into_raw(s)) == 0 { - println!("query '{}' ok", s); - } else { - println!("query '{}' error: {}", s, raw_into_str(taos_errstr(self.conn))); - } - } - } -} - -impl Drop for Tdengine { - fn drop(&mut self) { - unsafe {taos_close(self.conn);} - } -} - -fn str_into_raw(s: &str) -> *mut c_char { - if s.is_empty() { - 0 as *mut c_char - } else { - CString::new(s).unwrap().into_raw() - } -} - -fn raw_into_str<'a>(raw: *mut c_char) -> &'static str { - unsafe {CStr::from_ptr(raw).to_str().unwrap()} -} - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} \ No newline at end of file +pub mod utils; \ No newline at end of file diff --git a/tests/examples/rust/src/subscriber.rs b/tests/examples/rust/src/subscriber.rs new file mode 100644 index 0000000000000000000000000000000000000000..b6812d7b6e5b48016c62c3bb45d11bfb7fd85e0b --- /dev/null +++ b/tests/examples/rust/src/subscriber.rs @@ -0,0 +1,77 @@ +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] + +#[path = "utils.rs"] +mod utils; +use utils::*; +use utils::bindings::*; + +use std::os::raw::{c_void, c_char, c_int, c_long}; + +pub struct Subscriber { + tsub: *mut c_void, + fields: *mut taosField, + fcount: c_int, +} + +impl Subscriber { + pub fn new(host: &str, + username: &str, + passwd: &str, + db: &str, + table:&str, + time: i64, + mseconds: i32 + ) -> Result { + unsafe { + let mut tsub = taos_subscribe(str_into_raw(host), + str_into_raw(username), + str_into_raw(passwd), + str_into_raw(db), + str_into_raw(table), + time as c_long, + mseconds as c_int); + if tsub.is_null() { + return Err("subscribe error") + } + println!("subscribed to {} user:{}, db:{}, tb:{}, time:{}, mseconds:{}", + host, username, db, table, time, mseconds); + + let mut fields = taos_fetch_subfields(tsub); + if fields.is_null() { + taos_unsubscribe(tsub); + return Err("fetch subfields error") + } + + let fcount = taos_subfields_count(tsub); + if fcount == 0 { + taos_unsubscribe(tsub); + return Err("subfields count is 0") + } + + Ok(Subscriber{tsub, fields, fcount}) + } + } + + pub fn consume(self: &Subscriber) -> Result { + unsafe { + let taosRow = taos_consume(self.tsub); + if taosRow.is_null() { + return Err("consume error") + } + let taosRow= std::slice::from_raw_parts(taosRow, self.fcount as usize); + let row = raw_into_row(self.fields, self.fcount, &taosRow); + Ok(row) + } + } + + pub fn print_row(self: &Subscriber, row: &Row) { + println!("{}", format_row(row)); + } +} + +impl Drop for Subscriber { + fn drop(&mut self) { + unsafe {taos_unsubscribe(self.tsub);} + } +} \ No newline at end of file diff --git a/tests/examples/rust/src/tdengine.rs b/tests/examples/rust/src/tdengine.rs new file mode 100644 index 0000000000000000000000000000000000000000..41225d52e0fc7e985a227f52cf2a2e9e2874f9b8 --- /dev/null +++ b/tests/examples/rust/src/tdengine.rs @@ -0,0 +1,65 @@ +#[path = "bindings.rs"] +mod bindings; +use bindings::*; + +#[path = "utils.rs"] +mod utils; +use utils::*; + +use std::os::raw::c_void; +use std::os::raw::c_char; +use std::os::raw::c_int; +use std::os::raw::c_long; + +pub struct Tdengine { + conn: *mut c_void, +} + +/// - **TODO**: doc +impl Tdengine { + + //! - **TODO**: implement default param. + //! + //! > refer to https://stackoverflow.com/questions/24047686/default-function-arguments-in-rust + pub fn new(ip: &str, username: &str, passwd: &str, db: &str, port: i32) -> Result { + unsafe { + taos_init(); + let mut conn = taos_connect(str_into_raw(ip), + str_into_raw(username), + str_into_raw(passwd), + str_into_raw(db), + port as c_int); + if conn.is_null() { + Err("connect error") + } else { + println!("connected to {}:{} user:{}, db:{}", ip, port, username, db); + Ok(Tdengine {conn}) + } + } + } + + // - **TODO**: check error code + pub fn query(self: &Tdengine, s: &str) { + unsafe { + if taos_query(self.conn, str_into_raw(s)) == 0 { + println!("query '{}' ok", s); + } else { + println!("query '{}' error: {}", s, raw_into_str(taos_errstr(self.conn))); + } + } + } +} + +impl Drop for Tdengine { + fn drop(&mut self) { + unsafe {taos_close(self.conn);} + } +} + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + assert_eq!(2 + 2, 4); + } +} \ No newline at end of file diff --git a/tests/examples/rust/src/utils.rs b/tests/examples/rust/src/utils.rs new file mode 100644 index 0000000000000000000000000000000000000000..2875507275c69cf19ca01ab0190e343196537d3e --- /dev/null +++ b/tests/examples/rust/src/utils.rs @@ -0,0 +1,127 @@ +#[path = "bindings.rs"] +pub mod bindings; +use bindings::*; + +use std::fmt; +use std::fmt::Display; +use std::os::raw::{c_void, c_char, c_int}; +use std::ffi::{CString, CStr}; + +// #[derive(Debug)] +pub enum Field { + tinyInt(i8), + smallInt(i16), + normalInt(i32), + bigInt(i64), + float(f32), + double(f64), + binary(String), + timeStamp(i64), + boolType(bool), +} + + +impl fmt::Display for Field { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &*self { + Field::tinyInt(v) => write!(f, "{}", v), + Field::smallInt(v) => write!(f, "{}", v), + Field::normalInt(v) => write!(f, "{}", v), + Field::bigInt(v) => write!(f, "{}", v), + Field::float(v) => write!(f, "{}", v), + Field::double(v) => write!(f, "{}", v), + Field::binary(v) => write!(f, "{}", v), + Field::tinyInt(v) => write!(f, "{}", v), + Field::timeStamp(v) => write!(f, "{}", v), + Field::boolType(v) => write!(f, "{}", v), + } + } +} + +// pub type Fields = Vec; +pub type Row = Vec; + +pub fn format_row(row: &Row) -> String { + let mut s = String::new(); + for field in row { + s.push_str(format!("{} ", field).as_str()); + // println!("{}", field); + } + s +} + +pub fn str_into_raw(s: &str) -> *mut c_char { + if s.is_empty() { + 0 as *mut c_char + } else { + CString::new(s).unwrap().into_raw() + } +} + +pub fn raw_into_str<'a>(raw: *mut c_char) -> &'static str { + unsafe {CStr::from_ptr(raw).to_str().unwrap()} +} + + +pub fn raw_into_field(raw: *mut TAOS_FIELD, fcount: c_int) -> Vec { + let mut fields: Vec = Vec::new(); + + for i in 0..fcount as isize { + fields.push( + taosField { + name: unsafe {(*raw.offset(i as isize))}.name, + bytes: unsafe {(*raw.offset(i as isize))}.bytes, + type_: unsafe {(*raw.offset(i as isize))}.type_, + } + ); + } + + /// TODO: error[E0382]: use of moved value: `fields` + // for field in &fields { + // println!("type: {}, bytes: {}", field.type_, field.bytes); + // } + + fields +} + + pub fn raw_into_row(fields: *mut TAOS_FIELD, fcount: c_int, raw_row: &[*mut c_void]) -> Row { + let mut row: Row= Vec::new(); + let fields = raw_into_field(fields, fcount); + + for (i, field) in fields.iter().enumerate() { + // println!("index: {}, type: {}, bytes: {}", i, field.type_, field.bytes); + unsafe { + match field.type_ as u32 { + TSDB_DATA_TYPE_TINYINT => { + row.push(Field::tinyInt(*(raw_row[i] as *mut i8))); + } + TSDB_DATA_TYPE_SMALLINT => { + row.push(Field::smallInt(*(raw_row[i] as *mut i16))); + } + TSDB_DATA_TYPE_INT => { + row.push(Field::normalInt(*(raw_row[i] as *mut i32))); + } + TSDB_DATA_TYPE_BIGINT => { + row.push(Field::bigInt(*(raw_row[i] as *mut i64))); + } + TSDB_DATA_TYPE_FLOAT => { + row.push(Field::float(*(raw_row[i] as *mut f32))); + } + TSDB_DATA_TYPE_DOUBLE => { + row.push(Field::double(*(raw_row[i] as *mut f64))); + } + TSDB_DATA_TYPE_BINARY | TSDB_DATA_TYPE_NCHAR => { + // row.push(Field::binary(*(raw_row[i] as *mut f64))); + } + TSDB_DATA_TYPE_TIMESTAMP => { + row.push(Field::timeStamp(*(raw_row[i] as *mut i64))); + } + TSDB_DATA_TYPE_BOOL => { + // row.push(Field::boolType(*(raw_row[i] as *mut i8) as bool)); + } + _ => println!(""), + } + } + } + row + } \ No newline at end of file