未验证 提交 33f35846 编写于 作者: L Linhe Huo 提交者: GitHub

[TD-12250]<docs>: fix confused titles and syntax error of example code in python connector (#9252)

上级 47d563e1
...@@ -41,6 +41,7 @@ cursor.execute("show databases") ...@@ -41,6 +41,7 @@ cursor.execute("show databases")
results = cursor.fetchall() results = cursor.fetchall()
for row in results: for row in results:
print(row) print(row)
cursor.close() cursor.close()
conn.close() conn.close()
``` ```
...@@ -57,8 +58,10 @@ result = conn.query("show databases") ...@@ -57,8 +58,10 @@ result = conn.query("show databases")
num_of_fields = result.field_count num_of_fields = result.field_count
for field in result.fields: for field in result.fields:
print(field) print(field)
for row in result: for row in result:
print(row) print(row)
result.close() result.close()
conn.execute("drop database pytest") conn.execute("drop database pytest")
conn.close() conn.close()
...@@ -81,6 +84,7 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -81,6 +84,7 @@ def fetch_callback(p_param, p_result, num_of_rows):
p.contents.done = True p.contents.done = True
result.close() result.close()
return return
if num_of_rows < 0: if num_of_rows < 0:
p.contents.done = True p.contents.done = True
result.check_error(num_of_rows) result.check_error(num_of_rows)
...@@ -90,6 +94,7 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -90,6 +94,7 @@ def fetch_callback(p_param, p_result, num_of_rows):
for row in result.rows_iter(num_of_rows): for row in result.rows_iter(num_of_rows):
# print(row) # print(row)
None None
p.contents.count += result.row_count p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param) result.fetch_rows_a(fetch_callback, p_param)
...@@ -97,11 +102,13 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -97,11 +102,13 @@ def fetch_callback(p_param, p_result, num_of_rows):
def query_callback(p_param, p_result, code): def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_int) -> None
if p_result == None: if p_result is None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
if code == 0: if code == 0:
result.fetch_rows_a(fetch_callback, p_param) result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code) result.check_error(code)
...@@ -120,6 +127,7 @@ def test_query(conn): ...@@ -120,6 +127,7 @@ def test_query(conn):
while not counter.done: while not counter.done:
print("wait query callback") print("wait query callback")
time.sleep(1) time.sleep(1)
print(counter) print(counter)
conn.close() conn.close()
...@@ -182,6 +190,7 @@ result = conn.query("select * from log") ...@@ -182,6 +190,7 @@ result = conn.query("select * from log")
for row in result: for row in result:
print(row) print(row)
result.close() result.close()
stmt.close() stmt.close()
conn.close() conn.close()
...@@ -237,18 +246,20 @@ result.close() ...@@ -237,18 +246,20 @@ result.close()
result = conn.query("select * from log") result = conn.query("select * from log")
for row in result: for row in result:
print(row) print(row)
result.close() result.close()
stmt.close() stmt.close()
conn.close() conn.close()
``` ```
### Statement API - Subscribe ### Subscription
```python ```python
import taos import taos
import random
conn = taos.connect() conn = taos.connect()
dbname = "pytest_taos_subscribe_callback" dbname = "pytest_taos_subscribe"
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname) conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname) conn.select_db(dbname)
...@@ -256,7 +267,7 @@ conn.execute("create table if not exists log(ts timestamp, n int)") ...@@ -256,7 +267,7 @@ conn.execute("create table if not exists log(ts timestamp, n int)")
for i in range(10): for i in range(10):
conn.execute("insert into log values(now, %d)" % i) conn.execute("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000) sub = conn.subscribe(False, "test", "select * from log", 1000)
print("# consume from begin") print("# consume from begin")
for ts, n in sub.consume(): for ts, n in sub.consume():
print(ts, n) print(ts, n)
...@@ -268,9 +279,18 @@ for i in range(5): ...@@ -268,9 +279,18 @@ for i in range(5):
for ts, n in result: for ts, n in result:
print(ts, n) print(ts, n)
sub.close(True)
print("# keep progress consume")
sub = conn.subscribe(False, "test", "select * from log", 1000)
result = sub.consume()
rows = result.fetch_all()
# consume from latest subscription needs root privilege(for /var/lib/taos).
assert result.row_count == 0
print("## consumed ", len(rows), "rows")
print("# consume with a stop condition") print("# consume with a stop condition")
for i in range(10): for i in range(10):
conn.execute("insert into log values(now, %d)" % int(random() * 10)) conn.execute("insert into log values(now, %d)" % random.randint(0, 10))
result = sub.consume() result = sub.consume()
try: try:
ts, n = next(result) ts, n = next(result)
...@@ -283,12 +303,13 @@ for i in range(10): ...@@ -283,12 +303,13 @@ for i in range(10):
continue continue
sub.close() sub.close()
# sub.close()
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close() # conn.close()
``` ```
### Statement API - Subscribe asynchronously with callback ### Subscription asynchronously with callback
```python ```python
from taos import * from taos import *
...@@ -300,7 +321,7 @@ import time ...@@ -300,7 +321,7 @@ import time
def subscribe_callback(p_sub, p_result, p_param, errno): def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback") print("# fetch in callback")
result = TaosResult(p_result) result = TaosResult(c_void_p(p_result))
result.check_error(errno) result.check_error(errno)
for row in result.rows_iter(): for row in result.rows_iter():
ts, n = row() ts, n = row()
...@@ -311,42 +332,45 @@ def test_subscribe_callback(conn): ...@@ -311,42 +332,45 @@ def test_subscribe_callback(conn):
# type: (TaosConnection) -> None # type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback" dbname = "pytest_taos_subscribe_callback"
try: try:
print("drop if exists")
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
print("create database")
conn.execute("create database if not exists %s" % dbname) conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname) print("create table")
conn.execute("create table if not exists log(ts timestamp, n int)") # conn.execute("use %s" % dbname)
conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname)
print("# subscribe with callback") print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback) sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback)
for i in range(10): for i in range(10):
conn.execute("insert into log values(now, %d)" % i) conn.execute("insert into %s.log values(now, %d)" % (dbname, i))
time.sleep(0.7) time.sleep(0.7)
sub.close() sub.close()
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close() # conn.close()
except Exception as err: except Exception as err:
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close() # conn.close()
raise err raise err
if __name__ == "__main__": if __name__ == "__main__":
test_subscribe_callback(connect()) test_subscribe_callback(connect())
``` ```
### Statement API - Stream ### Stream
```python ```python
from taos import * from taos import *
from ctypes import * from ctypes import *
import time
def stream_callback(p_param, p_result, p_row): def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None # type: (c_void_p, c_void_p, c_void_p) -> None
if p_result is None or p_row is None:
if p_result == None or p_row == None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
row = TaosRow(result, p_row) row = TaosRow(result, p_row)
...@@ -355,13 +379,12 @@ def stream_callback(p_param, p_result, p_row): ...@@ -355,13 +379,12 @@ def stream_callback(p_param, p_result, p_row):
p = cast(p_param, POINTER(Counter)) p = cast(p_param, POINTER(Counter))
p.contents.count += count p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count)) print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err: except Exception as err:
print(err) print(err)
raise err raise err
class Counter(ctypes.Structure): class Counter(Structure):
_fields_ = [ _fields_ = [
("count", c_int), ("count", c_int),
] ]
...@@ -388,6 +411,7 @@ def test_stream(conn): ...@@ -388,6 +411,7 @@ def test_stream(conn):
for _ in range(0, 20): for _ in range(0, 20):
conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)") conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2) time.sleep(2)
stream.close() stream.close()
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close() conn.close()
...@@ -399,12 +423,14 @@ def test_stream(conn): ...@@ -399,12 +423,14 @@ def test_stream(conn):
if __name__ == "__main__": if __name__ == "__main__":
test_stream(connect()) test_stream(connect())
``` ```
### Insert with line protocol ### Insert with line protocol
```python ```python
import taos import taos
from taos import SmlProtocol, SmlPrecision
conn = taos.connect() conn = taos.connect()
dbname = "pytest_line" dbname = "pytest_line"
...@@ -413,29 +439,22 @@ conn.execute("create database if not exists %s precision 'us'" % dbname) ...@@ -413,29 +439,22 @@ conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname) conn.select_db(dbname)
lines = [ lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns', 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
] ]
conn.schemaless_insert(lines, 0, "ns") conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)
print("inserted") print("inserted")
lines = [ conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
conn.schemaless_insert(lines, 0, "ns")
result = conn.query("show tables") result = conn.query("show tables")
for row in result: for row in result:
print(row) print(row)
result.close()
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
conn.close()
``` ```
## License - AGPL-3.0 ## License
Keep same with [TDengine](https://github.com/taosdata/TDengine). We use MIT license for Python connector.
...@@ -29,7 +29,7 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -29,7 +29,7 @@ def fetch_callback(p_param, p_result, num_of_rows):
def query_callback(p_param, p_result, code): def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_int) -> None
if p_result == None: if p_result is None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
if code == 0: if code == 0:
......
from taos import *
from ctypes import *
import time
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result is None or p_row is None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
...@@ -86,7 +86,7 @@ def fetch_callback(p_param, p_result, num_of_rows): ...@@ -86,7 +86,7 @@ def fetch_callback(p_param, p_result, num_of_rows):
def query_callback(p_param, p_result, code): def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None # type: (c_void_p, c_void_p, c_int) -> None
if p_result == None: if p_result is None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
if code == 0: if code == 0:
...@@ -335,7 +335,7 @@ from ctypes import * ...@@ -335,7 +335,7 @@ from ctypes import *
def stream_callback(p_param, p_result, p_row): def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None # type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None: if p_result is None or p_row is None:
return return
result = TaosResult(p_result) result = TaosResult(p_result)
row = TaosRow(result, p_row) row = TaosRow(result, p_row)
......
...@@ -317,7 +317,7 @@ class TaosMultiBind(ctypes.Structure): ...@@ -317,7 +317,7 @@ class TaosMultiBind(ctypes.Structure):
def _str_to_buffer(self, values): def _str_to_buffer(self, values):
self.num = len(values) self.num = len(values)
is_null = [1 if v == None else 0 for v in values] is_null = [1 if v is None else 0 for v in values]
self.is_null = cast((c_byte * self.num)(*is_null), c_char_p) self.is_null = cast((c_byte * self.num)(*is_null), c_char_p)
if sum(is_null) == self.num: if sum(is_null) == self.num:
......
...@@ -373,9 +373,9 @@ def taos_fetch_block(result, fields=None, field_count=None): ...@@ -373,9 +373,9 @@ def taos_fetch_block(result, fields=None, field_count=None):
if num_of_rows == 0: if num_of_rows == 0:
return None, 0 return None, 0
precision = taos_result_precision(result) precision = taos_result_precision(result)
if fields == None: if fields is None:
fields = taos_fetch_fields(result) fields = taos_fetch_fields(result)
if field_count == None: if field_count is None:
field_count = taos_field_count(result) field_count = taos_field_count(result)
blocks = [None] * field_count blocks = [None] * field_count
fieldLen = taos_fetch_lengths(result, field_count) fieldLen = taos_fetch_lengths(result, field_count)
...@@ -466,7 +466,7 @@ def taos_fetch_lengths(result, field_count=None): ...@@ -466,7 +466,7 @@ def taos_fetch_lengths(result, field_count=None):
# type: (c_void_p, int) -> Array[int] # type: (c_void_p, int) -> Array[int]
"""Make sure to call taos_fetch_row or taos_fetch_block before fetch_lengths""" """Make sure to call taos_fetch_row or taos_fetch_block before fetch_lengths"""
lens = _libtaos.taos_fetch_lengths(result) lens = _libtaos.taos_fetch_lengths(result)
if field_count == None: if field_count is None:
field_count = taos_field_count(result) field_count = taos_field_count(result)
if not lens: if not lens:
raise OperationalError("field length empty, use taos_fetch_row/block before it") raise OperationalError("field length empty, use taos_fetch_row/block before it")
...@@ -823,7 +823,7 @@ def taos_stmt_use_result(stmt): ...@@ -823,7 +823,7 @@ def taos_stmt_use_result(stmt):
@stmt: TAOS_STMT* @stmt: TAOS_STMT*
""" """
result = c_void_p(_libtaos.taos_stmt_use_result(stmt)) result = c_void_p(_libtaos.taos_stmt_use_result(stmt))
if result == None: if result is None:
raise StatementError(taos_stmt_errstr(stmt)) raise StatementError(taos_stmt_errstr(stmt))
return result return result
......
...@@ -41,7 +41,7 @@ class TaosResult(object): ...@@ -41,7 +41,7 @@ class TaosResult(object):
if self._result is None or self.fields is None: if self._result is None or self.fields is None:
raise OperationalError("Invalid use of fetch iterator") raise OperationalError("Invalid use of fetch iterator")
if self._block == None or self._block_iter >= self._block_length: if self._block is None or self._block_iter >= self._block_length:
self._block, self._block_length = self.fetch_block() self._block, self._block_length = self.fetch_block()
self._block_iter = 0 self._block_iter = 0
# self._row_count += self._block_length # self._row_count += self._block_length
...@@ -55,7 +55,7 @@ class TaosResult(object): ...@@ -55,7 +55,7 @@ class TaosResult(object):
"""fields definitions of the current result""" """fields definitions of the current result"""
if self._result is None: if self._result is None:
raise ResultError("no result object setted") raise ResultError("no result object setted")
if self._fields == None: if self._fields is None:
self._fields = taos_fetch_fields(self._result) self._fields = taos_fetch_fields(self._result)
return self._fields return self._fields
...@@ -72,7 +72,7 @@ class TaosResult(object): ...@@ -72,7 +72,7 @@ class TaosResult(object):
@property @property
def precision(self): def precision(self):
if self._precision == None: if self._precision is None:
self._precision = taos_result_precision(self._result) self._precision = taos_result_precision(self._result)
return self._precision return self._precision
...@@ -114,7 +114,7 @@ class TaosResult(object): ...@@ -114,7 +114,7 @@ class TaosResult(object):
if self._result is None: if self._result is None:
raise OperationalError("Invalid use of fetchall") raise OperationalError("Invalid use of fetchall")
if self._fields == None: if self._fields is None:
self._fields = taos_fetch_fields(self._result) self._fields = taos_fetch_fields(self._result)
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(self._fields))]
self._row_count = 0 self._row_count = 0
...@@ -150,7 +150,7 @@ class TaosResult(object): ...@@ -150,7 +150,7 @@ class TaosResult(object):
return taos_errstr(self._result) return taos_errstr(self._result)
def check_error(self, errno=None, close=True): def check_error(self, errno=None, close=True):
if errno == None: if errno is None:
errno = self.errno() errno = self.errno()
if errno != 0: if errno != 0:
msg = self.errstr() msg = self.errstr()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册