提交 3deedfb5 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/3.0_merge_main

......@@ -58,7 +58,7 @@ database_option: {
- WAL_FSYNC_PERIOD: specifies the interval (in milliseconds) at which data is written from the WAL to disk. This parameter takes effect only when the WAL parameter is set to 2. The default value is 3000. Enter a value between 0 and 180000. The value 0 indicates that incoming data is immediately written to disk.
- MAXROWS: specifies the maximum number of rows recorded in a block. The default value is 4096.
- MINROWS: specifies the minimum number of rows recorded in a block. The default value is 100.
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default.
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. The Enterprise Edition supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 <= keep 1 <= keep 2, e.g. KEEP 100h,100d,3650d) are supported; the Community Edition does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP).
- PAGES: specifies the number of pages in the metadata storage engine cache on each vnode. Enter a value greater than or equal to 64. The default value is 256. The space occupied by metadata storage on each vnode is equal to the product of the values of the PAGESIZE and PAGES parameters. The space occupied by default is 1 MB.
- PAGESIZE: specifies the size (in KB) of each page in the metadata storage engine cache on each vnode. The default value is 4. Enter a value between 1 and 16384.
- PRECISION: specifies the precision at which a database records timestamps. Enter ms for milliseconds, us for microseconds, or ns for nanoseconds. The default value is ms.
......
......@@ -363,7 +363,7 @@ Shows information about all vgroups in the system or about the vgroups for a spe
## SHOW VNODES
```sql
SHOW VNODES [dnode_name];
SHOW VNODES {dnode_id | dnode_endpoint};
```
Shows information about all vnodes in the system or about the vnodes for a specified dnode.
......@@ -323,6 +323,7 @@ The charset that takes effect is UTF-8.
| Applicable | Server Only |
| Meaning | All data files are stored in this directory |
| Default Value | /var/lib/taos |
| Note | The [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function needs to be used in conjunction with the [KEEP](https://docs.tdengine.com/taos-sql/database/#parameters) parameter |
### tempDir
......
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/taosdata/driver-go/v3 v3.1.0/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
import pandas
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
engine = create_engine("taos://root:taosdata@localhost:6030/power")
df = pandas.read_sql("SELECT * FROM meters", engine)
conn = engine.connect()
df = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
conn.close()
# print index
print(df.index)
......
import pandas
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
engine = create_engine("taosrest://root:taosdata@localhost:6041")
df: pandas.DataFrame = pandas.read_sql("SELECT * FROM power.meters", engine)
conn = engine.connect()
df: pandas.DataFrame = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
conn.close()
# print index
print(df.index)
......
# ANCHOR: connect
from taosrest import connect, TaosRestConnection, TaosRestCursor
conn: TaosRestConnection = connect(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)
conn = connect(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)
# ANCHOR_END: connect
# ANCHOR: basic
# create STable
cursor: TaosRestCursor = conn.cursor()
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power")
cursor.execute("CREATE DATABASE power")
cursor.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
cursor.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
# insert data
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
......@@ -28,7 +29,7 @@ print("queried row count:", cursor.rowcount)
# get column names from cursor
column_names = [meta[0] for meta in cursor.description]
# get rows
data: list[tuple] = cursor.fetchall()
data = cursor.fetchall()
print(column_names)
for row in data:
print(row)
......
......@@ -8,7 +8,7 @@ conn.execute("CREATE DATABASE test")
# change database. same as execute "USE db"
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
affected_row: int = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
affected_row = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
print("affected_row", affected_row)
# output:
# affected_row 3
......@@ -16,10 +16,10 @@ print("affected_row", affected_row)
# ANCHOR: query
# Execute a sql and get its result set. It's useful for SELECT statement
result: taos.TaosResult = conn.query("SELECT * from weather")
result = conn.query("SELECT * from weather")
# Get fields from result
fields: taos.field.TaosFields = result.fields
fields = result.fields
for field in fields:
print(field) # {name: ts, type: 9, bytes: 8}
......
# install dependencies:
# recommend python >= 3.8
# pip3 install faster-fifo
#
import logging
import math
import multiprocessing
import sys
import time
import os
from multiprocessing import Process
from faster_fifo import Queue
from multiprocessing import Process, Queue
from mockdatasource import MockDataSource
from queue import Empty
from typing import List
......@@ -22,8 +21,7 @@ TABLE_COUNT = 1000
QUEUE_SIZE = 1000000
MAX_BATCH_SIZE = 3000
read_processes = []
write_processes = []
_DONE_MESSAGE = '__DONE__'
def get_connection():
......@@ -44,41 +42,64 @@ def get_connection():
# ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]):
def run_read_task(task_id: int, task_queues: List[Queue], infinity):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
data_source = MockDataSource(f"tb{task_id}", table_count_per_task, infinity)
try:
for batch in data_source:
if isinstance(batch, tuple):
batch = [batch]
for table_id, rows in batch:
# hash data to different queue
i = table_id % len(task_queues)
# block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1)
for row in rows:
task_queues[i].put(row)
if not infinity:
for queue in task_queues:
queue.put(_DONE_MESSAGE)
except KeyboardInterrupt:
pass
finally:
logging.info('read task over')
# ANCHOR_END: read
# ANCHOR: write
def run_write_task(task_id: int, queue: Queue):
def run_write_task(task_id: int, queue: Queue, done_queue: Queue):
from sql_writer import SQLWriter
log = logging.getLogger(f"WriteTask-{task_id}")
writer = SQLWriter(get_connection)
lines = None
try:
while True:
try:
# get as many as possible
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
over = False
lines = []
for _ in range(MAX_BATCH_SIZE):
try:
line = queue.get_nowait()
if line == _DONE_MESSAGE:
over = True
break
if line:
lines.append(line)
except Empty:
time.sleep(0.1)
if len(lines) > 0:
writer.process_lines(lines)
except Empty:
time.sleep(0.01)
if over:
done_queue.put(_DONE_MESSAGE)
break
except KeyboardInterrupt:
pass
except BaseException as e:
log.debug(f"lines={lines}")
raise e
finally:
writer.close()
log.debug('write task over')
# ANCHOR_END: write
......@@ -103,47 +124,64 @@ def set_global_config():
# ANCHOR: monitor
def run_monitor_process():
def run_monitor_process(done_queue: Queue):
log = logging.getLogger("DataBaseMonitor")
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
conn = None
try:
conn = get_connection()
def get_count():
res = conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all()
return rows[0][0] if rows else 0
def get_count():
res = conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all()
return rows[0][0] if rows else 0
last_count = 0
while True:
time.sleep(10)
count = get_count()
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
last_count = 0
while True:
try:
done = done_queue.get_nowait()
if done == _DONE_MESSAGE:
break
except Empty:
pass
time.sleep(10)
count = get_count()
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
finally:
conn.close()
# ANCHOR_END: monitor
# ANCHOR: main
def main():
def main(infinity):
set_global_config()
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
monitor_process = Process(target=run_monitor_process)
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE IF NOT EXISTS test")
conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
conn.close()
done_queue = Queue()
monitor_process = Process(target=run_monitor_process, args=(done_queue,))
monitor_process.start()
time.sleep(3) # waiting for database ready.
logging.debug(f"monitor task started with pid {monitor_process.pid}")
task_queues: List[Queue] = []
write_processes = []
read_processes = []
# create task queues
for i in range(WRITE_TASK_COUNT):
queue = Queue(max_size_bytes=QUEUE_SIZE)
queue = Queue()
task_queues.append(queue)
# create write processes
for i in range(WRITE_TASK_COUNT):
p = Process(target=run_write_task, args=(i, task_queues[i]))
p = Process(target=run_write_task, args=(i, task_queues[i], done_queue))
p.start()
logging.debug(f"WriteTask-{i} started with pid {p.pid}")
write_processes.append(p)
......@@ -151,13 +189,19 @@ def main():
# create read processes
for i in range(READ_TASK_COUNT):
queues = assign_queues(i, task_queues)
p = Process(target=run_read_task, args=(i, queues))
p = Process(target=run_read_task, args=(i, queues, infinity))
p.start()
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes.append(p)
try:
monitor_process.join()
for p in read_processes:
p.join()
for p in write_processes:
p.join()
time.sleep(1)
return
except KeyboardInterrupt:
monitor_process.terminate()
[p.terminate() for p in read_processes]
......@@ -176,5 +220,6 @@ def assign_queues(read_task_id, task_queues):
if __name__ == '__main__':
main()
multiprocessing.set_start_method('spawn')
main(False)
# ANCHOR_END: main
......@@ -26,7 +26,8 @@ class Consumer(object):
'bath_consume': True,
'batch_size': 1000,
'async_model': True,
'workers': 10
'workers': 10,
'testing': False
}
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
......@@ -46,11 +47,12 @@ class Consumer(object):
def __init__(self, **configs):
self.config: dict = self.DEFAULT_CONFIGS
self.config.update(configs)
self.consumer = KafkaConsumer(
self.config.get('kafka_topic'), # topic
bootstrap_servers=self.config.get('kafka_brokers'),
group_id=self.config.get('kafka_group_id'),
)
if not self.config.get('testing'):
self.consumer = KafkaConsumer(
self.config.get('kafka_topic'), # topic
bootstrap_servers=self.config.get('kafka_brokers'),
group_id=self.config.get('kafka_group_id'),
)
self.taos = taos.connect(
host=self.config.get('taos_host'),
user=self.config.get('taos_user'),
......@@ -60,7 +62,7 @@ class Consumer(object):
)
if self.config.get('async_model'):
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
self.tasks: list[Future] = []
self.tasks = []
# tags and table mapping # key: {location}_{groupId} value:
self.tag_table_mapping = {}
i = 0
......@@ -115,14 +117,14 @@ class Consumer(object):
if self.taos is not None:
self.taos.close()
def _run(self, f: Callable[[ConsumerRecord], bool]):
def _run(self, f):
for message in self.consumer:
if self.config.get('async_model'):
self.pool.submit(f(message))
else:
f(message)
def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]):
def _run_batch(self, f):
while True:
messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
if messages:
......@@ -140,7 +142,7 @@ class Consumer(object):
logging.info('## insert sql %s', sql)
return self.taos.execute(sql=sql) == 1
def _to_taos_batch(self, messages: list[list[ConsumerRecord]]):
def _to_taos_batch(self, messages):
sql = self._build_sql_batch(messages=messages)
if len(sql) == 0: # decode error, skip
return
......@@ -162,7 +164,7 @@ class Consumer(object):
table_name = self._get_table_name(location=location, group_id=group_id)
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str:
def _build_sql_batch(self, messages) -> str:
sql_list = []
for partition_messages in messages:
for message in partition_messages:
......@@ -186,7 +188,54 @@ def _get_location_and_group(key: str) -> (str, int):
return fields[0], fields[1]
def test_to_taos(consumer: Consumer):
msg = {
'location': 'California.SanFrancisco',
'groupId': 1,
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027,
}
record = ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps(msg), partition=1,
topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None)
assert consumer._to_taos(message=record)
def test_to_taos_batch(consumer: Consumer):
records = [
[
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'location': 'California.SanFrancisco',
'groupId': 1,
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'location': 'California.LosAngles',
'groupId': 2,
'ts': '2022-12-06 15:13:39.643',
'current': 3.41,
'voltage': 102,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
]
]
consumer._to_taos_batch(messages=records)
if __name__ == '__main__':
consumer = Consumer(async_model=True)
consumer = Consumer(async_model=True, testing=True)
# init env
consumer.init_env()
consumer.consume()
\ No newline at end of file
# consumer.consume()
# test build sql
# test build sql batch
test_to_taos(consumer)
test_to_taos_batch(consumer)
......@@ -10,13 +10,14 @@ class MockDataSource:
"9.4,118,0.141,California.SanFrancisco,4"
]
def __init__(self, tb_name_prefix, table_count):
def __init__(self, tb_name_prefix, table_count, infinity=True):
self.table_name_prefix = tb_name_prefix + "_"
self.table_count = table_count
self.max_rows = 10000000
self.current_ts = round(time.time() * 1000) - self.max_rows * 100
# [(tableId, tableName, values),]
self.data = self._init_data()
self.infinity = infinity
def _init_data(self):
lines = self.samples * (self.table_count // 5 + 1)
......@@ -28,14 +29,19 @@ class MockDataSource:
def __iter__(self):
self.row = 0
return self
if not self.infinity:
return iter(self._iter_data())
else:
return self
def __next__(self):
"""
next 1000 rows for each table.
return: {tableId:[row,...]}
"""
# generate 1000 timestamps
return self._iter_data()
def _iter_data(self):
ts = []
for _ in range(1000):
self.current_ts += 100
......@@ -47,3 +53,9 @@ class MockDataSource:
rows = [table_name + ',' + t + ',' + values for t in ts]
result.append((table_id, rows))
return result
if __name__ == '__main__':
datasource = MockDataSource('t', 10, False)
for data in datasource:
print(data)
......@@ -10,6 +10,7 @@ class SQLWriter:
self._tb_tags = {}
self._conn = get_connection_func()
self._max_sql_length = self.get_max_sql_length()
self._conn.execute("create database if not exists test")
self._conn.execute("USE test")
def get_max_sql_length(self):
......@@ -20,7 +21,7 @@ class SQLWriter:
return int(r[1])
return 1024 * 1024
def process_lines(self, lines: str):
def process_lines(self, lines: [str]):
"""
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
"""
......@@ -60,6 +61,7 @@ class SQLWriter:
buf.append(q)
sql_len += len(q)
sql += " ".join(buf)
self.create_tables()
self.execute_sql(sql)
self._tb_values.clear()
......@@ -88,3 +90,22 @@ class SQLWriter:
except BaseException as e:
self.log.error("Execute SQL: %s", sql)
raise e
def close(self):
if self._conn:
self._conn.close()
if __name__ == '__main__':
def get_connection_func():
conn = taos.connect()
return conn
writer = SQLWriter(get_connection_func=get_connection_func)
writer.execute_sql(
"create stable if not exists meters (ts timestamp, current float, voltage int, phase float) "
"tags (location binary(64), groupId int)")
writer.execute_sql(
"INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) "
"VALUES ('2021-07-13 14:06:32.272', 10.2, 219, 0.32)")
......@@ -19,8 +19,14 @@ def init_tmq_env(db, topic):
conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
def cleanup(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
if __name__ == '__main__':
init_tmq_env("tmq_test", "tmq_test_topic") # init env
init_tmq_env("tmq_test", "tmq_test_topic") # init env
consumer = Consumer(
{
"group.id": "tg2",
......@@ -33,9 +39,9 @@ if __name__ == '__main__':
try:
while True:
res = consumer.poll(100)
res = consumer.poll(1)
if not res:
continue
break
err = res.error()
if err is not None:
raise err
......@@ -46,3 +52,4 @@ if __name__ == '__main__':
finally:
consumer.unsubscribe()
consumer.close()
cleanup("tmq_test", "tmq_test_topic")
......@@ -58,7 +58,7 @@ database_option: {
- WAL_FSYNC_PERIOD:当 WAL 参数设置为 2 时,落盘的周期。默认为 3000,单位毫秒。最小为 0,表示每次写入立即落盘;最大为 180000,即三分钟。
- MAXROWS:文件块中记录的最大条数,默认为 4096 条。
- MINROWS:文件块中记录的最小条数,默认为 100 条。
- KEEP:表示数据文件保存的天数,缺省值为 3650,取值范围 [1, 365000],且必须大于或等于 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m(分钟)、h(小时)和 d(天)三个单位。也可以不写单位,如 KEEP 50,此时默认单位为天。
- KEEP:表示数据文件保存的天数,缺省值为 3650,取值范围 [1, 365000],且必须大于或等于 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m(分钟)、h(小时)和 d(天)三个单位。也可以不写单位,如 KEEP 50,此时默认单位为天。企业版支持[多级存储](https://docs.taosdata.com/tdinternal/arch/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8)功能, 因此, 可以设置多个保存时间(多个以英文逗号分隔,最多 3 个,满足 keep 0 <= keep 1 <= keep 2,如 KEEP 100h,100d,3650d); 社区版不支持多级存储功能(即使配置了多个保存时间, 也不会生效, KEEP 会取最大的保存时间)。
- PAGES:一个 VNODE 中元数据存储引擎的缓存页个数,默认为 256,最小 64。一个 VNODE 元数据存储占用 PAGESIZE \* PAGES,默认情况下为 1MB 内存。
- PAGESIZE:一个 VNODE 中元数据存储引擎的页大小,单位为 KB,默认为 4 KB。范围为 1 到 16384,即 1 KB 到 16 MB。
- PRECISION:数据库的时间戳精度。ms 表示毫秒,us 表示微秒,ns 表示纳秒,默认 ms 毫秒。
......
......@@ -306,7 +306,7 @@ SHOW [db_name.]VGROUPS;
## SHOW VNODES
```sql
SHOW VNODES [dnode_name];
SHOW VNODES {dnode_id | dnode_endpoint};
```
显示当前系统中所有 VNODE 或某个 DNODE 的 VNODE 的信息。
......@@ -323,6 +323,7 @@ charset 的有效值是 UTF-8。
| 适用范围 | 仅服务端适用 |
| 含义 | 数据文件目录,所有的数据文件都将写入该目录 |
| 缺省值 | /var/lib/taos |
| 补充说明 | [多级存储](https://docs.taosdata.com/tdinternal/arch/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8) 功能需要与 [KEEP](https://docs.taosdata.com/taos-sql/database/#%E5%8F%82%E6%95%B0%E8%AF%B4%E6%98%8E) 参数配合使用 |
### tempDir
......
......@@ -56,7 +56,7 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile == NULL) {
taosMemoryFree(fname);
return -1;
return 0;
}
int64_t sz = 0;
......
......@@ -23,7 +23,7 @@ python3 bind_param_example.py
# 4
taos -s "drop database power"
python3 multi_bind_example.py
python3 multi_bind_example.py
# 5
python3 query_example.py
......@@ -44,4 +44,43 @@ taos -s "drop database test"
python3 json_protocol_example.py
# 10
# python3 subscribe_demo.py
pip install SQLAlchemy
pip install pandas
taosBenchmark -y -d power -t 10 -n 10
python3 conn_native_pandas.py
python3 conn_rest_pandas.py
taos -s "drop database if exists power"
# 11
taos -s "create database if not exists test"
python3 connect_native_reference.py
# 12
python3 connect_rest_examples.py
# 13
python3 handle_exception.py
# 14
taosBenchmark -y -d power -t 2 -n 10
python3 rest_client_example.py
taos -s "drop database if exists power"
# 15
python3 result_set_examples.py
# 16
python3 tmq_example.py
# 17
python3 sql_writer.py
# 18
python3 mockdatasource.py
# 19
python3 fast_write_example.py
# 20
pip3 install kafka-python
python3 kafka_example.py
......@@ -79,6 +79,7 @@ sql insert into db.ctb6 values(now, 6, "6")
sql insert into db.ctb7 values(now, 7, "7")
sql insert into db.ctb8 values(now, 8, "8")
sql insert into db.ctb9 values(now, 9, "9")
sql flush database db;
print =============== step3: create dnodes
sql create dnode $hostname port 7300
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册