未验证 提交 ba584bd1 编写于 作者: H huili 提交者: GitHub

Merge pull request #4019 from songtianyi/master

feat:  see #4016 
......@@ -4,6 +4,9 @@
[submodule "src/connector/grafanaplugin"]
path = src/connector/grafanaplugin
url = https://github.com/taosdata/grafanaplugin
[submodule "tests/examples/rust"]
path = tests/examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git
[submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension
url = https://github.com/huskar-t/hivemq-tdengine-extension.git
url = https://github.com/huskar-t/hivemq-tdengine-extension.git
\ No newline at end of file
Subproject commit f2ffd30521b8e8afbc9d25c75f8eeeb6a48bd030
[package]
name = "tdengine"
version = "0.1.0"
authors = ["Chunhua Jiang <jiangch@3reality.com>"]
edition = "2018"
[dependencies]
# TDengine driver connector for Rust
It's a rust implementation for [TDengine](https://github.com/taosdata/TDengine), an open-sourced big data platform designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring.
## Dependencies
- Rust:
```
curl https://sh.rustup.rs -sSf | sh
```
## Run with Sample
Build and run basic sample:
```
cargo run --example demo
```
Build and run subscribe sample:
```
cargo run --example subscribe
```
// build.rs
use std::env;
fn main() {
let project_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
println!("cargo:rustc-link-search={}", project_dir); // the "-L" flag
println!("cargo:rustc-link-lib=taos"); // the "-l" flag
}
use std::process;
use tdengine::Tdengine;
fn main() {
let tde = Tdengine::new("127.0.0.1", "root", "taosdata", "demo", 0)
.unwrap_or_else(|err| {
eprintln!("Can't create Tdengine: {}", err);
process::exit(1)
});
tde.query("drop database demo");
tde.query("create database demo");
tde.query("use demo");
tde.query("create table m1 (ts timestamp, speed int)");
for i in 0..10 {
tde.query(format!("insert into m1 values (now+{}s, {})", i, i).as_str());
}
}
use std::process;
use tdengine::Subscriber;
fn main() {
let subscriber = Subscriber::new("127.0.0.1", "root", "taosdata", "demo", "m1", 0, 1000)
.unwrap_or_else(|err| {
eprintln!("Can't create Subscriber: {}", err);
process::exit(1)
});
loop {
let row = subscriber.consume().unwrap_or_else(|err| {
eprintln!("consume exit: {}", err);
process::exit(1)
});
subscriber.print_row(&row);
}
}
/* automatically generated by rust-bindgen */
#![allow(unused)]
#![allow(non_camel_case_types)]
pub const _STDINT_H: u32 = 1;
pub const _FEATURES_H: u32 = 1;
pub const _DEFAULT_SOURCE: u32 = 1;
pub const __USE_ISOC11: u32 = 1;
pub const __USE_ISOC99: u32 = 1;
pub const __USE_ISOC95: u32 = 1;
pub const __USE_POSIX_IMPLICITLY: u32 = 1;
pub const _POSIX_SOURCE: u32 = 1;
pub const _POSIX_C_SOURCE: u32 = 200809;
pub const __USE_POSIX: u32 = 1;
pub const __USE_POSIX2: u32 = 1;
pub const __USE_POSIX199309: u32 = 1;
pub const __USE_POSIX199506: u32 = 1;
pub const __USE_XOPEN2K: u32 = 1;
pub const __USE_XOPEN2K8: u32 = 1;
pub const _ATFILE_SOURCE: u32 = 1;
pub const __USE_MISC: u32 = 1;
pub const __USE_ATFILE: u32 = 1;
pub const __USE_FORTIFY_LEVEL: u32 = 0;
pub const _STDC_PREDEF_H: u32 = 1;
pub const __STDC_IEC_559__: u32 = 1;
pub const __STDC_IEC_559_COMPLEX__: u32 = 1;
pub const __STDC_ISO_10646__: u32 = 201505;
pub const __STDC_NO_THREADS__: u32 = 1;
pub const __GNU_LIBRARY__: u32 = 6;
pub const __GLIBC__: u32 = 2;
pub const __GLIBC_MINOR__: u32 = 23;
pub const _SYS_CDEFS_H: u32 = 1;
pub const __WORDSIZE: u32 = 64;
pub const __WORDSIZE_TIME64_COMPAT32: u32 = 1;
pub const __SYSCALL_WORDSIZE: u32 = 64;
pub const _BITS_WCHAR_H: u32 = 1;
pub const INT8_MIN: i32 = -128;
pub const INT16_MIN: i32 = -32768;
pub const INT32_MIN: i32 = -2147483648;
pub const INT8_MAX: u32 = 127;
pub const INT16_MAX: u32 = 32767;
pub const INT32_MAX: u32 = 2147483647;
pub const UINT8_MAX: u32 = 255;
pub const UINT16_MAX: u32 = 65535;
pub const UINT32_MAX: u32 = 4294967295;
pub const INT_LEAST8_MIN: i32 = -128;
pub const INT_LEAST16_MIN: i32 = -32768;
pub const INT_LEAST32_MIN: i32 = -2147483648;
pub const INT_LEAST8_MAX: u32 = 127;
pub const INT_LEAST16_MAX: u32 = 32767;
pub const INT_LEAST32_MAX: u32 = 2147483647;
pub const UINT_LEAST8_MAX: u32 = 255;
pub const UINT_LEAST16_MAX: u32 = 65535;
pub const UINT_LEAST32_MAX: u32 = 4294967295;
pub const INT_FAST8_MIN: i32 = -128;
pub const INT_FAST16_MIN: i64 = -9223372036854775808;
pub const INT_FAST32_MIN: i64 = -9223372036854775808;
pub const INT_FAST8_MAX: u32 = 127;
pub const INT_FAST16_MAX: u64 = 9223372036854775807;
pub const INT_FAST32_MAX: u64 = 9223372036854775807;
pub const UINT_FAST8_MAX: u32 = 255;
pub const UINT_FAST16_MAX: i32 = -1;
pub const UINT_FAST32_MAX: i32 = -1;
pub const INTPTR_MIN: i64 = -9223372036854775808;
pub const INTPTR_MAX: u64 = 9223372036854775807;
pub const UINTPTR_MAX: i32 = -1;
pub const PTRDIFF_MIN: i64 = -9223372036854775808;
pub const PTRDIFF_MAX: u64 = 9223372036854775807;
pub const SIG_ATOMIC_MIN: i32 = -2147483648;
pub const SIG_ATOMIC_MAX: u32 = 2147483647;
pub const SIZE_MAX: i32 = -1;
pub const WINT_MIN: u32 = 0;
pub const WINT_MAX: u32 = 4294967295;
pub const TSDB_DATA_TYPE_NULL: u32 = 0;
pub const TSDB_DATA_TYPE_BOOL: u32 = 1;
pub const TSDB_DATA_TYPE_TINYINT: u32 = 2;
pub const TSDB_DATA_TYPE_SMALLINT: u32 = 3;
pub const TSDB_DATA_TYPE_INT: u32 = 4;
pub const TSDB_DATA_TYPE_BIGINT: u32 = 5;
pub const TSDB_DATA_TYPE_FLOAT: u32 = 6;
pub const TSDB_DATA_TYPE_DOUBLE: u32 = 7;
pub const TSDB_DATA_TYPE_BINARY: u32 = 8;
pub const TSDB_DATA_TYPE_TIMESTAMP: u32 = 9;
pub const TSDB_DATA_TYPE_NCHAR: u32 = 10;
pub type int_least8_t = ::std::os::raw::c_schar;
pub type int_least16_t = ::std::os::raw::c_short;
pub type int_least32_t = ::std::os::raw::c_int;
pub type int_least64_t = ::std::os::raw::c_long;
pub type uint_least8_t = ::std::os::raw::c_uchar;
pub type uint_least16_t = ::std::os::raw::c_ushort;
pub type uint_least32_t = ::std::os::raw::c_uint;
pub type uint_least64_t = ::std::os::raw::c_ulong;
pub type int_fast8_t = ::std::os::raw::c_schar;
pub type int_fast16_t = ::std::os::raw::c_long;
pub type int_fast32_t = ::std::os::raw::c_long;
pub type int_fast64_t = ::std::os::raw::c_long;
pub type uint_fast8_t = ::std::os::raw::c_uchar;
pub type uint_fast16_t = ::std::os::raw::c_ulong;
pub type uint_fast32_t = ::std::os::raw::c_ulong;
pub type uint_fast64_t = ::std::os::raw::c_ulong;
pub type intmax_t = ::std::os::raw::c_long;
pub type uintmax_t = ::std::os::raw::c_ulong;
pub const TSDB_OPTION_TSDB_OPTION_LOCALE: TSDB_OPTION = 0;
pub const TSDB_OPTION_TSDB_OPTION_CHARSET: TSDB_OPTION = 1;
pub const TSDB_OPTION_TSDB_OPTION_TIMEZONE: TSDB_OPTION = 2;
pub const TSDB_OPTION_TSDB_OPTION_CONFIGDIR: TSDB_OPTION = 3;
pub const TSDB_OPTION_TSDB_OPTION_SHELL_ACTIVITY_TIMER: TSDB_OPTION = 4;
pub const TSDB_OPTION_TSDB_MAX_OPTIONS: TSDB_OPTION = 5;
pub type TSDB_OPTION = u32;
#[repr(C)]
#[derive(Copy, Clone)]
pub struct taosField {
pub name: [::std::os::raw::c_char; 64usize],
pub bytes: ::std::os::raw::c_short,
pub type_: ::std::os::raw::c_char,
}
#[test]
fn bindgen_test_layout_taosField() {
assert_eq!(
::std::mem::size_of::<taosField>(),
68usize,
concat!("Size of: ", stringify!(taosField))
);
assert_eq!(
::std::mem::align_of::<taosField>(),
2usize,
concat!("Alignment of ", stringify!(taosField))
);
assert_eq!(
unsafe { &(*(::std::ptr::null::<taosField>())).name as *const _ as usize },
0usize,
concat!(
"Offset of field: ",
stringify!(taosField),
"::",
stringify!(name)
)
);
assert_eq!(
unsafe { &(*(::std::ptr::null::<taosField>())).bytes as *const _ as usize },
64usize,
concat!(
"Offset of field: ",
stringify!(taosField),
"::",
stringify!(bytes)
)
);
assert_eq!(
unsafe { &(*(::std::ptr::null::<taosField>())).type_ as *const _ as usize },
66usize,
concat!(
"Offset of field: ",
stringify!(taosField),
"::",
stringify!(type_)
)
);
}
pub type TAOS_FIELD = taosField;
extern "C" {
pub fn taos_init();
}
extern "C" {
pub fn taos_options(
option: TSDB_OPTION,
arg: *const ::std::os::raw::c_void,
...
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_connect(
ip: *mut ::std::os::raw::c_char,
user: *mut ::std::os::raw::c_char,
pass: *mut ::std::os::raw::c_char,
db: *mut ::std::os::raw::c_char,
port: ::std::os::raw::c_int,
) -> *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_close(taos: *mut ::std::os::raw::c_void);
}
extern "C" {
pub fn taos_query(
taos: *mut ::std::os::raw::c_void,
sqlstr: *mut ::std::os::raw::c_char,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_use_result(taos: *mut ::std::os::raw::c_void) -> *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_fetch_row(res: *mut ::std::os::raw::c_void) -> *mut *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_result_precision(res: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_free_result(res: *mut ::std::os::raw::c_void);
}
extern "C" {
pub fn taos_field_count(taos: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_num_fields(res: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_affected_rows(taos: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_fetch_fields(res: *mut ::std::os::raw::c_void) -> *mut TAOS_FIELD;
}
extern "C" {
pub fn taos_select_db(
taos: *mut ::std::os::raw::c_void,
db: *mut ::std::os::raw::c_char,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_print_row(
str: *mut ::std::os::raw::c_char,
row: *mut *mut ::std::os::raw::c_void,
fields: *mut TAOS_FIELD,
num_fields: ::std::os::raw::c_int,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_stop_query(res: *mut ::std::os::raw::c_void);
}
extern "C" {
pub fn taos_fetch_block(
res: *mut ::std::os::raw::c_void,
rows: *mut *mut *mut ::std::os::raw::c_void,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_validate_sql(
taos: *mut ::std::os::raw::c_void,
sql: *mut ::std::os::raw::c_char,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_get_server_info(taos: *mut ::std::os::raw::c_void) -> *mut ::std::os::raw::c_char;
}
extern "C" {
pub fn taos_get_client_info() -> *mut ::std::os::raw::c_char;
}
extern "C" {
pub fn taos_errstr(taos: *mut ::std::os::raw::c_void) -> *mut ::std::os::raw::c_char;
}
extern "C" {
pub fn taos_errno(taos: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_query_a(
taos: *mut ::std::os::raw::c_void,
sqlstr: *mut ::std::os::raw::c_char,
fp: ::std::option::Option<
unsafe extern "C" fn(
param: *mut ::std::os::raw::c_void,
arg1: *mut ::std::os::raw::c_void,
code: ::std::os::raw::c_int,
),
>,
param: *mut ::std::os::raw::c_void,
);
}
extern "C" {
pub fn taos_fetch_rows_a(
res: *mut ::std::os::raw::c_void,
fp: ::std::option::Option<
unsafe extern "C" fn(
param: *mut ::std::os::raw::c_void,
arg1: *mut ::std::os::raw::c_void,
numOfRows: ::std::os::raw::c_int,
),
>,
param: *mut ::std::os::raw::c_void,
);
}
extern "C" {
pub fn taos_fetch_row_a(
res: *mut ::std::os::raw::c_void,
fp: ::std::option::Option<
unsafe extern "C" fn(
param: *mut ::std::os::raw::c_void,
arg1: *mut ::std::os::raw::c_void,
row: *mut *mut ::std::os::raw::c_void,
),
>,
param: *mut ::std::os::raw::c_void,
);
}
extern "C" {
pub fn taos_subscribe(
host: *mut ::std::os::raw::c_char,
user: *mut ::std::os::raw::c_char,
pass: *mut ::std::os::raw::c_char,
db: *mut ::std::os::raw::c_char,
table: *mut ::std::os::raw::c_char,
time: i64,
mseconds: ::std::os::raw::c_int,
) -> *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_consume(tsub: *mut ::std::os::raw::c_void) -> *mut *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_unsubscribe(tsub: *mut ::std::os::raw::c_void);
}
extern "C" {
pub fn taos_open_stream(
taos: *mut ::std::os::raw::c_void,
sqlstr: *mut ::std::os::raw::c_char,
fp: ::std::option::Option<
unsafe extern "C" fn(
param: *mut ::std::os::raw::c_void,
arg1: *mut ::std::os::raw::c_void,
row: *mut *mut ::std::os::raw::c_void,
),
>,
stime: i64,
param: *mut ::std::os::raw::c_void,
callback: ::std::option::Option<unsafe extern "C" fn(arg1: *mut ::std::os::raw::c_void)>,
) -> *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_close_stream(tstr: *mut ::std::os::raw::c_void);
}
extern "C" {
pub static mut configDir: [::std::os::raw::c_char; 0usize];
}
#![allow(unused)]
#![allow(non_camel_case_types)]
pub mod subscriber;
pub use subscriber::*;
pub mod tdengine;
pub use tdengine::*;
pub mod utils;
\ No newline at end of file
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#[path = "utils.rs"]
mod utils;
use utils::*;
use utils::bindings::*;
use std::os::raw::{c_void, c_char, c_int, c_long};
pub struct Subscriber {
tsub: *mut c_void,
fields: *mut taosField,
fcount: c_int,
}
impl Subscriber {
pub fn new(host: &str,
username: &str,
passwd: &str,
db: &str,
table:&str,
time: i64,
mseconds: i32
) -> Result<Subscriber, &'static str> {
unsafe {
let mut tsub = taos_subscribe(str_into_raw(host),
str_into_raw(username),
str_into_raw(passwd),
str_into_raw(db),
str_into_raw(table),
time as c_long,
mseconds as c_int);
if tsub.is_null() {
return Err("subscribe error")
}
println!("subscribed to {} user:{}, db:{}, tb:{}, time:{}, mseconds:{}",
host, username, db, table, time, mseconds);
let mut fields = taos_fetch_fields(tsub);
if fields.is_null() {
taos_unsubscribe(tsub);
return Err("fetch fields error")
}
let fcount = taos_field_count(tsub);
if fcount == 0 {
taos_unsubscribe(tsub);
return Err("fields count is 0")
}
Ok(Subscriber{tsub, fields, fcount})
}
}
pub fn consume(self: &Subscriber) -> Result<Row, &'static str> {
unsafe {
let taosRow = taos_consume(self.tsub);
if taosRow.is_null() {
return Err("consume error")
}
let taosRow= std::slice::from_raw_parts(taosRow, self.fcount as usize);
let row = raw_into_row(self.fields, self.fcount, &taosRow);
Ok(row)
}
}
pub fn print_row(self: &Subscriber, row: &Row) {
println!("{}", format_row(row));
}
}
impl Drop for Subscriber {
fn drop(&mut self) {
unsafe {taos_unsubscribe(self.tsub);}
}
}
#[path = "bindings.rs"]
mod bindings;
use bindings::*;
#[path = "utils.rs"]
mod utils;
use utils::*;
use std::os::raw::c_void;
use std::os::raw::c_char;
use std::os::raw::c_int;
use std::os::raw::c_long;
pub struct Tdengine {
conn: *mut c_void,
}
/// - **TODO**: doc
impl Tdengine {
//! - **TODO**: implement default param.
//!
//! > refer to https://stackoverflow.com/questions/24047686/default-function-arguments-in-rust
pub fn new(ip: &str, username: &str, passwd: &str, db: &str, port: i32) -> Result<Tdengine, &'static str> {
unsafe {
taos_init();
let mut conn = taos_connect(str_into_raw(ip),
str_into_raw(username),
str_into_raw(passwd),
str_into_raw(db),
port as c_int);
if conn.is_null() {
Err("connect error")
} else {
println!("connected to {}:{} user:{}, db:{}", ip, port, username, db);
Ok(Tdengine {conn})
}
}
}
// - **TODO**: check error code
pub fn query(self: &Tdengine, s: &str) {
unsafe {
if taos_query(self.conn, str_into_raw(s)) == 0 {
println!("query '{}' ok", s);
} else {
println!("query '{}' error: {}", s, raw_into_str(taos_errstr(self.conn)));
}
}
}
}
impl Drop for Tdengine {
fn drop(&mut self) {
unsafe {taos_close(self.conn);}
}
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}
\ No newline at end of file
#[path = "bindings.rs"]
pub mod bindings;
use bindings::*;
use std::fmt;
use std::fmt::Display;
use std::os::raw::{c_void, c_char, c_int};
use std::ffi::{CString, CStr};
// #[derive(Debug)]
pub enum Field {
tinyInt(i8),
smallInt(i16),
normalInt(i32),
bigInt(i64),
float(f32),
double(f64),
binary(String),
timeStamp(i64),
boolType(bool),
}
impl fmt::Display for Field {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &*self {
Field::tinyInt(v) => write!(f, "{}", v),
Field::smallInt(v) => write!(f, "{}", v),
Field::normalInt(v) => write!(f, "{}", v),
Field::bigInt(v) => write!(f, "{}", v),
Field::float(v) => write!(f, "{}", v),
Field::double(v) => write!(f, "{}", v),
Field::binary(v) => write!(f, "{}", v),
Field::tinyInt(v) => write!(f, "{}", v),
Field::timeStamp(v) => write!(f, "{}", v),
Field::boolType(v) => write!(f, "{}", v),
}
}
}
// pub type Fields = Vec<Field>;
pub type Row = Vec<Field>;
pub fn format_row(row: &Row) -> String {
let mut s = String::new();
for field in row {
s.push_str(format!("{} ", field).as_str());
// println!("{}", field);
}
s
}
pub fn str_into_raw(s: &str) -> *mut c_char {
if s.is_empty() {
0 as *mut c_char
} else {
CString::new(s).unwrap().into_raw()
}
}
pub fn raw_into_str<'a>(raw: *mut c_char) -> &'static str {
unsafe {CStr::from_ptr(raw).to_str().unwrap()}
}
pub fn raw_into_field(raw: *mut TAOS_FIELD, fcount: c_int) -> Vec<taosField> {
let mut fields: Vec<taosField> = Vec::new();
for i in 0..fcount as isize {
fields.push(
taosField {
name: unsafe {(*raw.offset(i as isize))}.name,
bytes: unsafe {(*raw.offset(i as isize))}.bytes,
type_: unsafe {(*raw.offset(i as isize))}.type_,
}
);
}
/// TODO: error[E0382]: use of moved value: `fields`
// for field in &fields {
// println!("type: {}, bytes: {}", field.type_, field.bytes);
// }
fields
}
pub fn raw_into_row(fields: *mut TAOS_FIELD, fcount: c_int, raw_row: &[*mut c_void]) -> Row {
let mut row: Row= Vec::new();
let fields = raw_into_field(fields, fcount);
for (i, field) in fields.iter().enumerate() {
// println!("index: {}, type: {}, bytes: {}", i, field.type_, field.bytes);
unsafe {
match field.type_ as u32 {
TSDB_DATA_TYPE_TINYINT => {
row.push(Field::tinyInt(*(raw_row[i] as *mut i8)));
}
TSDB_DATA_TYPE_SMALLINT => {
row.push(Field::smallInt(*(raw_row[i] as *mut i16)));
}
TSDB_DATA_TYPE_INT => {
row.push(Field::normalInt(*(raw_row[i] as *mut i32)));
}
TSDB_DATA_TYPE_BIGINT => {
row.push(Field::bigInt(*(raw_row[i] as *mut i64)));
}
TSDB_DATA_TYPE_FLOAT => {
row.push(Field::float(*(raw_row[i] as *mut f32)));
}
TSDB_DATA_TYPE_DOUBLE => {
row.push(Field::double(*(raw_row[i] as *mut f64)));
}
TSDB_DATA_TYPE_BINARY | TSDB_DATA_TYPE_NCHAR => {
// row.push(Field::binary(*(raw_row[i] as *mut f64)));
}
TSDB_DATA_TYPE_TIMESTAMP => {
row.push(Field::timeStamp(*(raw_row[i] as *mut i64)));
}
TSDB_DATA_TYPE_BOOL => {
// row.push(Field::boolType(*(raw_row[i] as *mut i8) as bool));
}
_ => println!(""),
}
}
}
row
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册