subscriber.rs 2.3 KB
Newer Older
tidyjiang's avatar
tidyjiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]

#[path = "utils.rs"]
mod utils;
use utils::*;
use utils::bindings::*;

use std::os::raw::{c_void, c_char, c_int, c_long};

pub struct Subscriber {
    tsub: *mut c_void,
    fields: *mut taosField,
    fcount: c_int,
}

impl Subscriber {
    pub fn new(host: &str,
               username: &str,
               passwd: &str,
               db: &str,
               table:&str,
               time: i64,
               mseconds: i32
              ) -> Result<Subscriber, &'static str> {
        unsafe {
            let mut tsub = taos_subscribe(str_into_raw(host),
                                          str_into_raw(username),
                                          str_into_raw(passwd),
                                          str_into_raw(db),
                                          str_into_raw(table),
                                          time as c_long,
                                          mseconds as c_int);
            if tsub.is_null() {
                return Err("subscribe error")
            }
            println!("subscribed to {} user:{}, db:{}, tb:{}, time:{}, mseconds:{}",
                        host, username, db, table, time, mseconds);

40
            let mut fields = taos_fetch_fields(tsub);
tidyjiang's avatar
tidyjiang 已提交
41 42
            if fields.is_null() {
                taos_unsubscribe(tsub);
43
                return Err("fetch fields error")
tidyjiang's avatar
tidyjiang 已提交
44 45
            }

46
            let fcount = taos_field_count(tsub);
tidyjiang's avatar
tidyjiang 已提交
47 48
            if fcount == 0 {
                taos_unsubscribe(tsub);
49
                return Err("fields count is 0")
tidyjiang's avatar
tidyjiang 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
            }

            Ok(Subscriber{tsub, fields, fcount})
        }
    }

    pub fn consume(self: &Subscriber) -> Result<Row, &'static str> {
        unsafe {
            let taosRow = taos_consume(self.tsub);
            if taosRow.is_null() {
                return Err("consume error")
            }
            let taosRow= std::slice::from_raw_parts(taosRow, self.fcount as usize);
            let row = raw_into_row(self.fields, self.fcount, &taosRow);
            Ok(row)
        }
    }

    pub fn print_row(self: &Subscriber, row: &Row) {
        println!("{}", format_row(row));
    }
}

impl Drop for Subscriber {
    fn drop(&mut self) {
        unsafe {taos_unsubscribe(self.tsub);}
    }
77
}