提交 8a21d7f3 编写于 作者: wmmhello's avatar wmmhello

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-21899

......@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG 213f8b3
GIT_TAG 3e08996
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 0cd564a
GIT_TAG 181bcac
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -58,7 +58,7 @@ database_option: {
- WAL_FSYNC_PERIOD: specifies the interval (in milliseconds) at which data is written from the WAL to disk. This parameter takes effect only when the WAL parameter is set to 2. The default value is 3000. Enter a value between 0 and 180000. The value 0 indicates that incoming data is immediately written to disk.
- MAXROWS: specifies the maximum number of rows recorded in a block. The default value is 4096.
- MINROWS: specifies the minimum number of rows recorded in a block. The default value is 100.
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default.
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. The Enterprise Edition supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 <= keep 1 <= keep 2, e.g. KEEP 100h,100d,3650d) are supported; the Community Edition does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP).
- PAGES: specifies the number of pages in the metadata storage engine cache on each vnode. Enter a value greater than or equal to 64. The default value is 256. The space occupied by metadata storage on each vnode is equal to the product of the values of the PAGESIZE and PAGES parameters. The space occupied by default is 1 MB.
- PAGESIZE: specifies the size (in KB) of each page in the metadata storage engine cache on each vnode. The default value is 4. Enter a value between 1 and 16384.
- PRECISION: specifies the precision at which a database records timestamps. Enter ms for milliseconds, us for microseconds, or ns for nanoseconds. The default value is ms.
......
......@@ -363,7 +363,7 @@ Shows information about all vgroups in the system or about the vgroups for a spe
## SHOW VNODES
```sql
SHOW VNODES [dnode_name];
SHOW VNODES {dnode_id | dnode_endpoint};
```
Shows information about all vnodes in the system or about the vnodes for a specified dnode.
......@@ -323,6 +323,7 @@ The charset that takes effect is UTF-8.
| Applicable | Server Only |
| Meaning | All data files are stored in this directory |
| Default Value | /var/lib/taos |
| Note | The [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function needs to be used in conjunction with the [KEEP](https://docs.tdengine.com/taos-sql/database/#parameters) parameter |
### tempDir
......
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/taosdata/driver-go/v3 v3.1.0/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
import pandas
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
engine = create_engine("taos://root:taosdata@localhost:6030/power")
df = pandas.read_sql("SELECT * FROM meters", engine)
conn = engine.connect()
df = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
conn.close()
# print index
print(df.index)
......
import pandas
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
engine = create_engine("taosrest://root:taosdata@localhost:6041")
df: pandas.DataFrame = pandas.read_sql("SELECT * FROM power.meters", engine)
conn = engine.connect()
df: pandas.DataFrame = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
conn.close()
# print index
print(df.index)
......
# ANCHOR: connect
from taosrest import connect, TaosRestConnection, TaosRestCursor
conn: TaosRestConnection = connect(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)
conn = connect(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)
# ANCHOR_END: connect
# ANCHOR: basic
# create STable
cursor: TaosRestCursor = conn.cursor()
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power")
cursor.execute("CREATE DATABASE power")
cursor.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
cursor.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
# insert data
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
......@@ -28,7 +29,7 @@ print("queried row count:", cursor.rowcount)
# get column names from cursor
column_names = [meta[0] for meta in cursor.description]
# get rows
data: list[tuple] = cursor.fetchall()
data = cursor.fetchall()
print(column_names)
for row in data:
print(row)
......
......@@ -8,7 +8,7 @@ conn.execute("CREATE DATABASE test")
# change database. same as execute "USE db"
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
affected_row: int = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
affected_row = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
print("affected_row", affected_row)
# output:
# affected_row 3
......@@ -16,10 +16,10 @@ print("affected_row", affected_row)
# ANCHOR: query
# Execute a sql and get its result set. It's useful for SELECT statement
result: taos.TaosResult = conn.query("SELECT * from weather")
result = conn.query("SELECT * from weather")
# Get fields from result
fields: taos.field.TaosFields = result.fields
fields = result.fields
for field in fields:
print(field) # {name: ts, type: 9, bytes: 8}
......
# install dependencies:
# recommend python >= 3.8
# pip3 install faster-fifo
#
import logging
import math
import multiprocessing
import sys
import time
import os
from multiprocessing import Process
from faster_fifo import Queue
from multiprocessing import Process, Queue
from mockdatasource import MockDataSource
from queue import Empty
from typing import List
......@@ -22,8 +21,7 @@ TABLE_COUNT = 1000
QUEUE_SIZE = 1000000
MAX_BATCH_SIZE = 3000
read_processes = []
write_processes = []
_DONE_MESSAGE = '__DONE__'
def get_connection():
......@@ -44,41 +42,64 @@ def get_connection():
# ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]):
def run_read_task(task_id: int, task_queues: List[Queue], infinity):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
data_source = MockDataSource(f"tb{task_id}", table_count_per_task, infinity)
try:
for batch in data_source:
if isinstance(batch, tuple):
batch = [batch]
for table_id, rows in batch:
# hash data to different queue
i = table_id % len(task_queues)
# block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1)
for row in rows:
task_queues[i].put(row)
if not infinity:
for queue in task_queues:
queue.put(_DONE_MESSAGE)
except KeyboardInterrupt:
pass
finally:
logging.info('read task over')
# ANCHOR_END: read
# ANCHOR: write
def run_write_task(task_id: int, queue: Queue):
def run_write_task(task_id: int, queue: Queue, done_queue: Queue):
from sql_writer import SQLWriter
log = logging.getLogger(f"WriteTask-{task_id}")
writer = SQLWriter(get_connection)
lines = None
try:
while True:
try:
# get as many as possible
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
over = False
lines = []
for _ in range(MAX_BATCH_SIZE):
try:
line = queue.get_nowait()
if line == _DONE_MESSAGE:
over = True
break
if line:
lines.append(line)
except Empty:
time.sleep(0.1)
if len(lines) > 0:
writer.process_lines(lines)
except Empty:
time.sleep(0.01)
if over:
done_queue.put(_DONE_MESSAGE)
break
except KeyboardInterrupt:
pass
except BaseException as e:
log.debug(f"lines={lines}")
raise e
finally:
writer.close()
log.debug('write task over')
# ANCHOR_END: write
......@@ -103,47 +124,64 @@ def set_global_config():
# ANCHOR: monitor
def run_monitor_process():
def run_monitor_process(done_queue: Queue):
log = logging.getLogger("DataBaseMonitor")
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
conn = None
try:
conn = get_connection()
def get_count():
res = conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all()
return rows[0][0] if rows else 0
def get_count():
res = conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all()
return rows[0][0] if rows else 0
last_count = 0
while True:
time.sleep(10)
count = get_count()
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
last_count = 0
while True:
try:
done = done_queue.get_nowait()
if done == _DONE_MESSAGE:
break
except Empty:
pass
time.sleep(10)
count = get_count()
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
finally:
conn.close()
# ANCHOR_END: monitor
# ANCHOR: main
def main():
def main(infinity):
set_global_config()
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
monitor_process = Process(target=run_monitor_process)
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE IF NOT EXISTS test")
conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
conn.close()
done_queue = Queue()
monitor_process = Process(target=run_monitor_process, args=(done_queue,))
monitor_process.start()
time.sleep(3) # waiting for database ready.
logging.debug(f"monitor task started with pid {monitor_process.pid}")
task_queues: List[Queue] = []
write_processes = []
read_processes = []
# create task queues
for i in range(WRITE_TASK_COUNT):
queue = Queue(max_size_bytes=QUEUE_SIZE)
queue = Queue()
task_queues.append(queue)
# create write processes
for i in range(WRITE_TASK_COUNT):
p = Process(target=run_write_task, args=(i, task_queues[i]))
p = Process(target=run_write_task, args=(i, task_queues[i], done_queue))
p.start()
logging.debug(f"WriteTask-{i} started with pid {p.pid}")
write_processes.append(p)
......@@ -151,13 +189,19 @@ def main():
# create read processes
for i in range(READ_TASK_COUNT):
queues = assign_queues(i, task_queues)
p = Process(target=run_read_task, args=(i, queues))
p = Process(target=run_read_task, args=(i, queues, infinity))
p.start()
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes.append(p)
try:
monitor_process.join()
for p in read_processes:
p.join()
for p in write_processes:
p.join()
time.sleep(1)
return
except KeyboardInterrupt:
monitor_process.terminate()
[p.terminate() for p in read_processes]
......@@ -176,5 +220,6 @@ def assign_queues(read_task_id, task_queues):
if __name__ == '__main__':
main()
multiprocessing.set_start_method('spawn')
main(False)
# ANCHOR_END: main
......@@ -26,7 +26,8 @@ class Consumer(object):
'bath_consume': True,
'batch_size': 1000,
'async_model': True,
'workers': 10
'workers': 10,
'testing': False
}
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
......@@ -46,11 +47,12 @@ class Consumer(object):
def __init__(self, **configs):
self.config: dict = self.DEFAULT_CONFIGS
self.config.update(configs)
self.consumer = KafkaConsumer(
self.config.get('kafka_topic'), # topic
bootstrap_servers=self.config.get('kafka_brokers'),
group_id=self.config.get('kafka_group_id'),
)
if not self.config.get('testing'):
self.consumer = KafkaConsumer(
self.config.get('kafka_topic'), # topic
bootstrap_servers=self.config.get('kafka_brokers'),
group_id=self.config.get('kafka_group_id'),
)
self.taos = taos.connect(
host=self.config.get('taos_host'),
user=self.config.get('taos_user'),
......@@ -60,7 +62,7 @@ class Consumer(object):
)
if self.config.get('async_model'):
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
self.tasks: list[Future] = []
self.tasks = []
# tags and table mapping # key: {location}_{groupId} value:
self.tag_table_mapping = {}
i = 0
......@@ -115,14 +117,14 @@ class Consumer(object):
if self.taos is not None:
self.taos.close()
def _run(self, f: Callable[[ConsumerRecord], bool]):
def _run(self, f):
for message in self.consumer:
if self.config.get('async_model'):
self.pool.submit(f(message))
else:
f(message)
def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]):
def _run_batch(self, f):
while True:
messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
if messages:
......@@ -140,7 +142,7 @@ class Consumer(object):
logging.info('## insert sql %s', sql)
return self.taos.execute(sql=sql) == 1
def _to_taos_batch(self, messages: list[list[ConsumerRecord]]):
def _to_taos_batch(self, messages):
sql = self._build_sql_batch(messages=messages)
if len(sql) == 0: # decode error, skip
return
......@@ -162,7 +164,7 @@ class Consumer(object):
table_name = self._get_table_name(location=location, group_id=group_id)
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str:
def _build_sql_batch(self, messages) -> str:
sql_list = []
for partition_messages in messages:
for message in partition_messages:
......@@ -186,7 +188,54 @@ def _get_location_and_group(key: str) -> (str, int):
return fields[0], fields[1]
def test_to_taos(consumer: Consumer):
msg = {
'location': 'California.SanFrancisco',
'groupId': 1,
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027,
}
record = ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps(msg), partition=1,
topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None)
assert consumer._to_taos(message=record)
def test_to_taos_batch(consumer: Consumer):
records = [
[
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'location': 'California.SanFrancisco',
'groupId': 1,
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'location': 'California.LosAngles',
'groupId': 2,
'ts': '2022-12-06 15:13:39.643',
'current': 3.41,
'voltage': 102,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
]
]
consumer._to_taos_batch(messages=records)
if __name__ == '__main__':
consumer = Consumer(async_model=True)
consumer = Consumer(async_model=True, testing=True)
# init env
consumer.init_env()
consumer.consume()
\ No newline at end of file
# consumer.consume()
# test build sql
# test build sql batch
test_to_taos(consumer)
test_to_taos_batch(consumer)
......@@ -10,13 +10,14 @@ class MockDataSource:
"9.4,118,0.141,California.SanFrancisco,4"
]
def __init__(self, tb_name_prefix, table_count):
def __init__(self, tb_name_prefix, table_count, infinity=True):
self.table_name_prefix = tb_name_prefix + "_"
self.table_count = table_count
self.max_rows = 10000000
self.current_ts = round(time.time() * 1000) - self.max_rows * 100
# [(tableId, tableName, values),]
self.data = self._init_data()
self.infinity = infinity
def _init_data(self):
lines = self.samples * (self.table_count // 5 + 1)
......@@ -28,14 +29,19 @@ class MockDataSource:
def __iter__(self):
self.row = 0
return self
if not self.infinity:
return iter(self._iter_data())
else:
return self
def __next__(self):
"""
next 1000 rows for each table.
return: {tableId:[row,...]}
"""
# generate 1000 timestamps
return self._iter_data()
def _iter_data(self):
ts = []
for _ in range(1000):
self.current_ts += 100
......@@ -47,3 +53,9 @@ class MockDataSource:
rows = [table_name + ',' + t + ',' + values for t in ts]
result.append((table_id, rows))
return result
if __name__ == '__main__':
datasource = MockDataSource('t', 10, False)
for data in datasource:
print(data)
......@@ -10,6 +10,7 @@ class SQLWriter:
self._tb_tags = {}
self._conn = get_connection_func()
self._max_sql_length = self.get_max_sql_length()
self._conn.execute("create database if not exists test")
self._conn.execute("USE test")
def get_max_sql_length(self):
......@@ -20,7 +21,7 @@ class SQLWriter:
return int(r[1])
return 1024 * 1024
def process_lines(self, lines: str):
def process_lines(self, lines: [str]):
"""
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
"""
......@@ -60,6 +61,7 @@ class SQLWriter:
buf.append(q)
sql_len += len(q)
sql += " ".join(buf)
self.create_tables()
self.execute_sql(sql)
self._tb_values.clear()
......@@ -88,3 +90,22 @@ class SQLWriter:
except BaseException as e:
self.log.error("Execute SQL: %s", sql)
raise e
def close(self):
if self._conn:
self._conn.close()
if __name__ == '__main__':
def get_connection_func():
conn = taos.connect()
return conn
writer = SQLWriter(get_connection_func=get_connection_func)
writer.execute_sql(
"create stable if not exists meters (ts timestamp, current float, voltage int, phase float) "
"tags (location binary(64), groupId int)")
writer.execute_sql(
"INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) "
"VALUES ('2021-07-13 14:06:32.272', 10.2, 219, 0.32)")
......@@ -19,8 +19,14 @@ def init_tmq_env(db, topic):
conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
def cleanup(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
if __name__ == '__main__':
init_tmq_env("tmq_test", "tmq_test_topic") # init env
init_tmq_env("tmq_test", "tmq_test_topic") # init env
consumer = Consumer(
{
"group.id": "tg2",
......@@ -33,9 +39,9 @@ if __name__ == '__main__':
try:
while True:
res = consumer.poll(100)
res = consumer.poll(1)
if not res:
continue
break
err = res.error()
if err is not None:
raise err
......@@ -46,3 +52,4 @@ if __name__ == '__main__':
finally:
consumer.unsubscribe()
consumer.close()
cleanup("tmq_test", "tmq_test_topic")
......@@ -58,7 +58,7 @@ database_option: {
- WAL_FSYNC_PERIOD:当 WAL 参数设置为 2 时,落盘的周期。默认为 3000,单位毫秒。最小为 0,表示每次写入立即落盘;最大为 180000,即三分钟。
- MAXROWS:文件块中记录的最大条数,默认为 4096 条。
- MINROWS:文件块中记录的最小条数,默认为 100 条。
- KEEP:表示数据文件保存的天数,缺省值为 3650,取值范围 [1, 365000],且必须大于或等于 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m(分钟)、h(小时)和 d(天)三个单位。也可以不写单位,如 KEEP 50,此时默认单位为天。
- KEEP:表示数据文件保存的天数,缺省值为 3650,取值范围 [1, 365000],且必须大于或等于 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m(分钟)、h(小时)和 d(天)三个单位。也可以不写单位,如 KEEP 50,此时默认单位为天。企业版支持[多级存储](https://docs.taosdata.com/tdinternal/arch/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8)功能, 因此, 可以设置多个保存时间(多个以英文逗号分隔,最多 3 个,满足 keep 0 <= keep 1 <= keep 2,如 KEEP 100h,100d,3650d); 社区版不支持多级存储功能(即使配置了多个保存时间, 也不会生效, KEEP 会取最大的保存时间)。
- PAGES:一个 VNODE 中元数据存储引擎的缓存页个数,默认为 256,最小 64。一个 VNODE 元数据存储占用 PAGESIZE \* PAGES,默认情况下为 1MB 内存。
- PAGESIZE:一个 VNODE 中元数据存储引擎的页大小,单位为 KB,默认为 4 KB。范围为 1 到 16384,即 1 KB 到 16 MB。
- PRECISION:数据库的时间戳精度。ms 表示毫秒,us 表示微秒,ns 表示纳秒,默认 ms 毫秒。
......
......@@ -306,7 +306,7 @@ SHOW [db_name.]VGROUPS;
## SHOW VNODES
```sql
SHOW VNODES [dnode_name];
SHOW VNODES {dnode_id | dnode_endpoint};
```
显示当前系统中所有 VNODE 或某个 DNODE 的 VNODE 的信息。
......@@ -323,6 +323,7 @@ charset 的有效值是 UTF-8。
| 适用范围 | 仅服务端适用 |
| 含义 | 数据文件目录,所有的数据文件都将写入该目录 |
| 缺省值 | /var/lib/taos |
| 补充说明 | [多级存储](https://docs.taosdata.com/tdinternal/arch/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8) 功能需要与 [KEEP](https://docs.taosdata.com/taos-sql/database/#%E5%8F%82%E6%95%B0%E8%AF%B4%E6%98%8E) 参数配合使用 |
### tempDir
......
......@@ -146,9 +146,9 @@ extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max,
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind);
void tColDataSortMerge(SArray *colDataArr);
//for raw block
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes,
int32_t nRows, char* lengthOrbitmap, char *data);
// for raw block
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data);
// for encode/decode
int32_t tPutColData(uint8_t *pBuf, SColData *pColData);
int32_t tGetColData(uint8_t *pBuf, SColData *pColData);
......@@ -261,7 +261,13 @@ struct STag {
// STSchema ================================
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version);
void tDestroyTSchema(STSchema *pTSchema);
#define tDestroyTSchema(pTSchema) \
do { \
if (pTSchema) { \
taosMemoryFree(pTSchema); \
pTSchema = NULL; \
} \
} while (0)
#endif
......
......@@ -914,6 +914,7 @@ typedef struct {
int32_t numOfRetensions;
SArray* pRetensions;
int8_t schemaless;
int16_t sstTrigger;
} SDbCfgRsp;
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
......
......@@ -370,7 +370,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem);
if (pSubmitClone == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
......@@ -382,19 +383,19 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE ||
pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
} else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (pItem->type == STREAM_INPUT__GET_RES) {
} else if (type == STREAM_INPUT__GET_RES) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
}
if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
......
......@@ -193,7 +193,7 @@ typedef struct SSyncLogStore {
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);
int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
......
......@@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
SWalRef *walRefFirstVer(SWal *, SWalRef *);
SWalRef *walRefCommittedVer(SWal *);
SWalRef *walOpenRef(SWal *);
......
......@@ -746,7 +746,7 @@ function is_version_compatible() {
deb_erase() {
confirm=""
while [ "" == "${confirm}" ]; do
echo -e -n "${RED}Exist tdengine deb detected, do you want to remove it? [yes|no] ${NC}:"
echo -e -n "${RED}Existing TDengine deb is detected, do you want to remove it? [yes|no] ${NC}:"
read confirm
if [ "yes" == "$confirm" ]; then
${csudo}dpkg --remove tdengine ||:
......@@ -760,7 +760,7 @@ deb_erase() {
rpm_erase() {
confirm=""
while [ "" == "${confirm}" ]; do
echo -e -n "${RED}Exist tdengine rpm detected, do you want to remove it? [yes|no] ${NC}:"
echo -e -n "${RED}Existing TDengine rpm is detected, do you want to remove it? [yes|no] ${NC}:"
read confirm
if [ "yes" == "$confirm" ]; then
${csudo}rpm -e tdengine ||:
......
......@@ -400,45 +400,6 @@ void destroyRequest(SRequestObj *pRequest) {
removeRequest(pRequest->self);
}
void taosClientCrash(int signum, void *sigInfo, void *context) {
taosIgnSignal(SIGTERM);
taosIgnSignal(SIGHUP);
taosIgnSignal(SIGINT);
taosIgnSignal(SIGBREAK);
#if !defined(WINDOWS)
taosIgnSignal(SIGBUS);
#endif
taosIgnSignal(SIGABRT);
taosIgnSignal(SIGFPE);
taosIgnSignal(SIGSEGV);
char *pMsg = NULL;
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255;
int64_t msgLen= -1;
if (tsEnableCrashReport) {
if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
goto _return;
} else {
msgLen = strlen(pMsg);
}
}
_return:
taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
#ifdef _TD_DARWIN_64
exit(signum);
#elif defined(WINDOWS)
exit(signum);
#endif
}
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
static void *tscCrashReportThreadFp(void *param) {
......@@ -535,15 +496,26 @@ void tscStopCrashReport() {
}
}
static void tscSetSignalHandle() {
#if !defined(WINDOWS)
taosSetSignal(SIGBUS, taosClientCrash);
#endif
taosSetSignal(SIGABRT, taosClientCrash);
taosSetSignal(SIGFPE, taosClientCrash);
taosSetSignal(SIGSEGV, taosClientCrash);
void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
char *pMsg = NULL;
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255;
int64_t msgLen= -1;
if (tsEnableCrashReport) {
if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
} else {
msgLen = strlen(pMsg);
}
}
taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
}
void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
......@@ -567,8 +539,6 @@ void taos_init_imp(void) {
return;
}
tscSetSignalHandle();
initQueryModuleMsgHandle();
if (taosConvInit() != 0) {
......
......@@ -1239,7 +1239,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char* errorMsg =
......
......@@ -528,9 +528,8 @@ void taos_stop_query(TAOS_RES *res) {
SRequestObj *pRequest = (SRequestObj *)res;
pRequest->killed = true;
int32_t numOfFields = taos_num_fields(pRequest);
// It is not a query, no need to stop.
if (numOfFields == 0) {
if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
return;
}
......
......@@ -179,7 +179,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
}
string = cJSON_PrintUnformatted(json);
end:
end:
cJSON_Delete(json);
tFreeSMAltertbReq(&req);
return string;
......@@ -200,7 +200,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
}
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
_err:
_err:
tDecoderClear(&coder);
return string;
}
......@@ -220,7 +220,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
}
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
_err:
_err:
tDecoderClear(&coder);
return string;
}
......@@ -302,7 +302,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
cJSON_AddItemToArray(tags, tag);
}
end:
end:
cJSON_AddItemToObject(json, "tags", tags);
taosArrayDestroy(pTagVals);
}
......@@ -360,7 +360,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
}
}
_exit:
_exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
......@@ -373,7 +373,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
}
static char* processAutoCreateTable(STaosxRsp* rsp) {
if(rsp->createTableNum <= 0){
if (rsp->createTableNum <= 0) {
uError("WriteRaw:processAutoCreateTable rsp->createTableNum <= 0");
goto _exit;
}
......@@ -392,14 +392,14 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
goto _exit;
}
if(pCreateReq[iReq].type != TSDB_CHILD_TABLE){
if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) {
uError("WriteRaw:processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
goto _exit;
}
}
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
_exit:
_exit:
for (int i = 0; i < rsp->createTableNum; i++) {
tDecoderClear(&decoder[i]);
taosMemoryFreeClear(pCreateReq[i].comment);
......@@ -500,7 +500,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
char* buf = NULL;
if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
if(!tTagIsJson(vAlterTbReq.pTagVal)){
if (!tTagIsJson(vAlterTbReq.pTagVal)) {
uError("processAlterTable isJson false");
goto _exit;
}
......@@ -524,7 +524,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
}
string = cJSON_PrintUnformatted(json);
_exit:
_exit:
cJSON_Delete(json);
tDecoderClear(&decoder);
return string;
......@@ -557,12 +557,12 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json);
_exit:
_exit:
cJSON_Delete(json);
tDecoderClear(&decoder);
return string;
}
static char* processDeleteTable(SMqMetaRsp* metaRsp){
static char* processDeleteTable(SMqMetaRsp* metaRsp) {
SDeleteRes req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
......@@ -596,7 +596,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp){
string = cJSON_PrintUnformatted(json);
_exit:
_exit:
cJSON_Delete(json);
tDecoderClear(&coder);
return string;
......@@ -638,7 +638,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json);
_exit:
_exit:
cJSON_Delete(json);
tDecoderClear(&decoder);
return string;
......@@ -726,7 +726,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
end:
end:
destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder);
......@@ -796,7 +796,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
end:
end:
destroyRequest(pRequest);
tDecoderClear(&coder);
return code;
......@@ -857,9 +857,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table
......@@ -939,7 +939,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
end:
end:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
......@@ -1009,9 +1009,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
......@@ -1063,7 +1063,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
}
code = pRequest->code;
end:
end:
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
......@@ -1131,7 +1131,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
}
taos_free_result(res);
end:
end:
tDecoderClear(&coder);
return code;
}
......@@ -1178,9 +1178,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
}
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0};
SName pName = {0};
......@@ -1239,7 +1239,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
code = handleAlterTbExecRes(pRes->res, pCatalog);
}
}
end:
end:
taosArrayDestroy(pArray);
if (pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData);
......@@ -1402,7 +1402,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
end:
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
......@@ -1532,7 +1532,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
end:
tDeleteSMqDataRsp(&rspObj.rsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
......@@ -1631,7 +1631,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
if(pCreateReq.type != TSDB_CHILD_TABLE){
if (pCreateReq.type != TSDB_CHILD_TABLE) {
uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName);
code = TSDB_CODE_TSC_INVALID_VALUE;
goto end;
......
......@@ -1532,10 +1532,6 @@ STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
return pTSchema;
}
void tDestroyTSchema(STSchema *pTSchema) {
if (pTSchema) taosMemoryFree(pTSchema);
}
// SColData ========================================
void tColDataDestroy(void *ph) {
SColData *pColData = (SColData *)ph;
......
......@@ -2821,8 +2821,8 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
}
if (tEncodeI8(&encoder, pRsp->schemaless) < 0) return -1;
if (tEncodeI16(&encoder, pRsp->sstTrigger) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
......@@ -2873,6 +2873,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
}
}
if (tDecodeI8(&decoder, &pRsp->schemaless) < 0) return -1;
if (tDecodeI16(&decoder, &pRsp->sstTrigger) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
......
......@@ -889,7 +889,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
cfgRsp.pRetensions = pDb->cfg.pRetensions;
cfgRsp.schemaless = pDb->cfg.schemaless;
cfgRsp.sstTrigger = pDb->cfg.sstTrigger;
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
......
......@@ -153,6 +153,8 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
int32_t metaTbCursorPrev(SMTbCursor *pTbCur);
#endif
// tsdb
......
......@@ -206,6 +206,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
uint8_t **ppBuf);
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
uint8_t **ppBuf);
int32_t tRowInfoCmprFn(const void *p1, const void *p2);
// tsdbMemTable ==============================================================================================
// SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
......
......@@ -252,7 +252,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ========================================
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// STqSnapshotReader ==
......
......@@ -310,7 +310,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
}
}
int metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
int ret;
void *pBuf;
STbCfg tbCfg;
......@@ -334,6 +334,30 @@ int metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
return 0;
}
int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
int ret;
void *pBuf;
STbCfg tbCfg;
for (;;) {
ret = tdbTbcPrev(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
if (ret < 0) {
return -1;
}
tDecoderClear(&pTbCur->mr.coder);
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
continue;
}
break;
}
return 0;
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
void *pData = NULL;
int nData = 0;
......@@ -682,9 +706,8 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
}
}
if (sver <= 0) {
metaError("meta/query: incorrect sver: %" PRId32 ".", sver);
code = TSDB_CODE_FAILED;
if (ASSERTS(sver > 0, __FILE__, __LINE__, "failed to get table schema version: %d", sver)) {
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
......
......@@ -446,10 +446,10 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
// rsma1/rsma2
if (pHdr->type == SNAP_DATA_RSMA1) {
pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[0], pData, nData);
code = tsdbSnapWrite(pWriter->pDataWriter[0], pHdr);
} else if (pHdr->type == SNAP_DATA_RSMA2) {
pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData);
code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr);
} else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
} else {
......
......@@ -520,7 +520,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
}
} else {
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
if (pHandle->pRef == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1);
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
......
......@@ -56,7 +56,7 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile == NULL) {
taosMemoryFree(fname);
return -1;
return 0;
}
int64_t sz = 0;
......
......@@ -268,7 +268,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
taosThreadMutexLock(&pr->readerMutex);
tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
pr->pDataFReader = NULL;
pr->pDataFReaderLast = NULL;
......@@ -279,7 +282,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _end;
}
if (h == NULL) {
......@@ -352,7 +355,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _end;
}
if (h == NULL) {
......
......@@ -458,9 +458,8 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
taosMemoryFree(pHeadF);
}
} else {
nRef = pHeadF->nRef;
*pHeadF = *pSetNew->pHeadF;
pHeadF->nRef = nRef;
ASSERT(pHeadF->offset == pSetNew->pHeadF->offset);
ASSERT(pHeadF->size == pSetNew->pHeadF->size);
}
// data
......@@ -481,9 +480,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
taosMemoryFree(pDataF);
}
} else {
nRef = pDataF->nRef;
*pDataF = *pSetNew->pDataF;
pDataF->nRef = nRef;
pDataF->size = pSetNew->pDataF->size;
}
// sma
......@@ -504,9 +501,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
taosMemoryFree(pSmaF);
}
} else {
nRef = pSmaF->nRef;
*pSmaF = *pSetNew->pSmaF;
pSmaF->nRef = nRef;
pSmaF->size = pSetNew->pSmaF->size;
}
// stt
......
......@@ -573,6 +573,68 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
return pResBlock;
}
static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexInit(&pReader->readerMutex, NULL);
qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexDestroy(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
static int32_t tsdbAcquireReader(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexLock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexTryLock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
static int32_t tsdbReleaseReader(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexUnlock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
void tsdbReleaseDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
if (!pStatus->composedDataBlock) {
tsdbReleaseReader(pReader);
}
}
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
SSDataBlock* pResBlock, const char* idstr) {
int32_t code = 0;
......@@ -636,7 +698,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols);
taosThreadMutexInit(&pReader->readerMutex, NULL);
tsdbInitReaderLock(pReader);
*ppReader = pReader;
return code;
......@@ -1776,12 +1838,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
if (minKey == k.ts) {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
}
if (init) {
tsdbRowMerge(&merge, pRow);
tsdbRowMergerAdd(&merge, pRow, pSchema);
} else {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2882,7 +2947,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%d, elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
......@@ -2932,7 +2997,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%d, elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
......@@ -4013,8 +4078,9 @@ void tsdbReaderClose(STsdbReader* pReader) {
qTrace("tsdb/reader: %p, untake snapshot", pReader);
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true);
pReader->pReadSnap = NULL;
taosThreadMutexDestroy(&pReader->readerMutex);
tsdbUninitReaderLock(pReader);
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
SIOCostSummary* pCost = &pReader->cost;
......@@ -4096,6 +4162,28 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
// pInfo->lastKey = ts;
}
} else {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
pInfo->iter.hasVal = false;
pInfo->iiter.hasVal = false;
if (pInfo->iter.iter != NULL) {
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
}
if (pInfo->iiter.iter != NULL) {
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
// pInfo->lastKey = ts;
}
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
if (pBlockScanInfo) {
// save lastKey to restore memory iterator
......@@ -4104,7 +4192,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
// reset current current table's data block scan info,
pBlockScanInfo->iterInit = false;
// pBlockScanInfo->iiter.hasVal = false;
pBlockScanInfo->iter.hasVal = false;
pBlockScanInfo->iiter.hasVal = false;
if (pBlockScanInfo->iter.iter != NULL) {
pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
}
......@@ -4138,16 +4227,16 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
int32_t code = 0;
STsdbReader* pReader = pQHandle;
code = taosThreadMutexTryLock(&pReader->readerMutex);
code = tsdbTryAcquireReader(pReader);
if (code == 0) {
if (pReader->suspended) {
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return code;
}
tsdbReaderSuspend(pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return code;
} else if (code == EBUSY) {
......@@ -4248,8 +4337,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
qTrace("tsdb/read: %p, take read mutex", pReader);
taosThreadMutexLock(&pReader->readerMutex);
int32_t code = tsdbAcquireReader(pReader);
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
if (pReader->suspended) {
tsdbReaderResume(pReader);
}
......@@ -4261,7 +4351,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
pStatus = &pReader->innerReader[0]->status;
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
}
return ret;
......@@ -4284,7 +4374,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (ret) {
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
}
return ret;
......@@ -4304,7 +4394,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
pStatus = &pReader->innerReader[1]->status;
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
}
return ret1;
......@@ -4312,7 +4402,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
}
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return false;
}
......@@ -4471,13 +4561,6 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
return pReader->pResBlock;
}
void tsdbReleaseDataBlock(STsdbReader* pReader) {
// SReaderStatus* pStatus = &pReader->status;
// if (!pStatus->composedDataBlock) {
taosThreadMutexUnlock(&pReader->readerMutex);
//}
}
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
STsdbReader* pTReader = pReader;
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
......@@ -4496,7 +4579,7 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return ret;
}
......@@ -4505,7 +4588,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
SReaderStatus* pStatus = &pReader->status;
qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
taosThreadMutexLock(&pReader->readerMutex);
tsdbAcquireReader(pReader);
if (pReader->suspended) {
tsdbReaderResume(pReader);
......@@ -4514,7 +4597,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return TSDB_CODE_SUCCESS;
}
......@@ -4552,7 +4635,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return code;
}
......@@ -4563,7 +4646,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
pReader->idStr);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return code;
}
......@@ -4648,7 +4731,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
int64_t rows = 0;
SReaderStatus* pStatus = &pReader->status;
taosThreadMutexLock(&pReader->readerMutex);
tsdbAcquireReader(pReader);
if (pReader->suspended) {
tsdbReaderResume(pReader);
}
......@@ -4678,7 +4761,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
}
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return rows;
}
......
......@@ -732,6 +732,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
if (key.version > pMerger->version) {
#if 0
if (!COL_VAL_IS_NONE(pColVal)) {
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol);
......@@ -747,6 +748,28 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
#endif
if (!COL_VAL_IS_NONE(pColVal)) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
if (code) return code;
pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
}
pTColVal->flag = 0;
} else {
tFree(pTColVal->value.pData);
pTColVal->value.pData = NULL;
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
} else if (key.version < pMerger->version) {
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
......@@ -1110,6 +1133,7 @@ _exit:
void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->suid = 0;
pBlockData->uid = 0;
pBlockData->nRow = 0;
}
void tBlockDataClear(SBlockData *pBlockData) {
......
......@@ -455,7 +455,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
if (code) goto _err;
}
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
if (code) goto _err;
} break;
case SNAP_DATA_TQ_HANDLE: {
......
......@@ -805,6 +805,7 @@ int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb);
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName);
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;
......
......@@ -598,10 +598,16 @@ int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInf
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, pVgroup));
ctgRUnlockVgInfo(dbCache);
SCtgTbMetaCtx ctx = {0};
ctx.pName = (SName*)pTableName;
ctx.flag = CTG_FLAG_UNKNOWN_STB;
CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db));
code = ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
CTG_RET(code);
_return:
......
......@@ -999,6 +999,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL;
} else {
SBuildUseDBInput input = {0};
......@@ -1168,6 +1169,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL;
} else {
SBuildUseDBInput input = {0};
......
......@@ -2118,7 +2118,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
_return:
if (dbCache) {
if (code == TSDB_CODE_SUCCESS && dbCache) {
ctgWUnlockVgInfo(dbCache);
}
......
......@@ -281,10 +281,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
len += sprintf(
buf2 + VARSTR_HEADER_SIZE,
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile,
pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2,
pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2,
pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables);
......
......@@ -24,12 +24,16 @@
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1;
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
static void cleanupRefPool() {
int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
taosCloseRef(ref);
}
static void initRefPool() {
exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
atexit(cleanupRefPool);
}
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
......@@ -448,7 +452,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
atexit(cleanupRefPool);
qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
......
......@@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, &pageId);
taosArrayPush(p->pPageList, &pageId);
if (pPage == NULL) {
return pPage;
}
taosArrayPush(p->pPageList, &pageId);
*(int32_t*)pPage = 0;
} else {
int32_t* curId = taosArrayGetLast(p->pPageList);
......@@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
// add a new page for current group
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, &pageId);
if (pPage == NULL) {
qError("failed to get new buffer, code:%s", tstrerror(terrno));
return NULL;
}
taosArrayPush(p->pPageList, &pageId);
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
}
......
......@@ -66,7 +66,7 @@ typedef struct SSysTableScanInfo {
int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo;
int32_t tbnameSlotId;
int32_t tbnameSlotId;
} SSysTableScanInfo;
typedef struct {
......@@ -81,10 +81,10 @@ typedef struct MergeIndex {
} MergeIndex;
typedef struct SBlockDistInfo {
SSDataBlock* pResBlock;
STsdbReader* pHandle;
SReadHandle readHandle;
uint64_t uid; // table uid
SSDataBlock* pResBlock;
STsdbReader* pHandle;
SReadHandle readHandle;
uint64_t uid; // table uid
} SBlockDistInfo;
static int32_t sysChkFilter__Comm(SNode* pNode);
......@@ -129,20 +129,20 @@ static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time"
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName);
static void destroySysScanOperator(void* param);
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code);
static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo);
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName);
static void destroySysScanOperator(void* param);
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code);
static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo);
static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse);
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
SMetaReader* smrChildTable, const char* dbname, const char* tableName,
int32_t* pNumOfRows, const SSDataBlock* dataBlock);
static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname,
int32_t* pNumOfRows, const SSDataBlock* dataBlock,
char* tName, SSchemaWrapper* schemaRow, char* tableType);
static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname, int32_t* pNumOfRows,
const SSDataBlock* dataBlock, char* tName, SSchemaWrapper* schemaRow,
char* tableType);
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock,
SFilterInfo* pFilterInfo);
......@@ -204,11 +204,11 @@ int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
if (func == NULL) return -1;
SMetaFltParam param = {.suid = 0,
.cid = 0,
.type = TSDB_DATA_TYPE_VARCHAR,
.val = pVal->datum.p,
.reverse = reverse,
.filterFunc = func};
.cid = 0,
.type = TSDB_DATA_TYPE_VARCHAR,
.val = pVal->datum.p,
.reverse = reverse,
.filterFunc = func};
return -1;
}
......@@ -223,11 +223,11 @@ int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
if (func == NULL) return -1;
SMetaFltParam param = {.suid = 0,
.cid = 0,
.type = TSDB_DATA_TYPE_BIGINT,
.val = &pVal->datum.i,
.reverse = reverse,
.filterFunc = func};
.cid = 0,
.type = TSDB_DATA_TYPE_BIGINT,
.val = &pVal->datum.i,
.reverse = reverse,
.filterFunc = func};
int32_t ret = metaFilterCreateTime(pMeta, &param, result);
return ret;
......@@ -355,9 +355,9 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo);
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
const char* name, SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
*reverse = true;
}
......@@ -479,13 +479,13 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
}
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper *schemaRow = NULL;
if(smrTable.me.type == TSDB_SUPER_TABLE){
schemaRow = &smrTable.me.stbEntry.schemaRow;
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper* schemaRow = NULL;
if (smrTable.me.type == TSDB_SUPER_TABLE) {
schemaRow = &smrTable.me.stbEntry.schemaRow;
STR_TO_VARSTR(typeName, "CHILD_TABLE");
}else if(smrTable.me.type == TSDB_NORMAL_TABLE){
schemaRow = &smrTable.me.ntbEntry.schemaRow;
} else if (smrTable.me.type == TSDB_NORMAL_TABLE) {
schemaRow = &smrTable.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE");
}
......@@ -507,50 +507,50 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
SHashObj *stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash);
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) {
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper *schemaRow = NULL;
SSchemaWrapper* schemaRow = NULL;
if(pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE){
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
qDebug("sysTableScanUserCols cursor get super table");
void *schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if(schema == NULL){
SSchemaWrapper *schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if (schema == NULL) {
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
}
continue;
}else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
qDebug("sysTableScanUserCols cursor get child table");
STR_TO_VARSTR(typeName, "CHILD_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
void *schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
if(schema != NULL){
schemaRow = *(SSchemaWrapper **)schema;
}else{
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
if (schema != NULL) {
schemaRow = *(SSchemaWrapper**)schema;
} else {
tDecoderClear(&pInfo->pCur->mr.coder);
int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by metaGetTableEntryByName, therefore, return directly
qError("sysTableScanUserCols get meta by suid:%"PRId64 " error, code:%d", suid, code);
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
taosHashCleanup(stableSchema);
return NULL;
}
schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow;
schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow;
}
}else if(pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE){
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
qDebug("sysTableScanUserCols cursor get normal table");
schemaRow = &pInfo->pCur->mr.me.ntbEntry.schemaRow;
schemaRow = &pInfo->pCur->mr.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
}else{
} else {
qDebug("sysTableScanUserCols cursor get invalid table");
continue;
}
......@@ -665,6 +665,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
bool blockFull = false;
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_SUPER_TABLE)) == 0) {
if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
continue;
......@@ -686,17 +687,25 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock);
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
metaTbCursorPrev(pInfo->pCur);
blockFull = true;
} else {
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
dataBlock);
}
metaReaderClear(&smrSuperTable);
if (numOfRows >= pOperator->resultInfo.capacity) {
if (blockFull || numOfRows >= pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
if (pInfo->pRes->info.rows > 0) {
break;
}
blockFull = false;
}
}
......@@ -902,10 +911,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
return TSDB_CODE_SUCCESS;
}
static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname,
int32_t* pNumOfRows, const SSDataBlock* dataBlock, char* tName,
SSchemaWrapper* schemaRow, char* tableType) {
if(schemaRow == NULL){
static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname, int32_t* pNumOfRows,
const SSDataBlock* dataBlock, char* tName, SSchemaWrapper* schemaRow,
char* tableType) {
if (schemaRow == NULL) {
qError("sysTableUserColsFillOneTableCols schemaRow is NULL");
return TSDB_CODE_SUCCESS;
}
......@@ -941,9 +950,8 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo,
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(
varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataAppend(pColInfoData, numOfRows, (char*)colTypeStr, false);
......@@ -1550,9 +1558,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
if (pInfo->showRewrite) {
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
}else if(strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0){
} else if (strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0) {
getDBNameFromCondition(pInfo->pCondition, dbName);
if(dbName[0]) sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
if (dbName[0]) sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
sysTableIsCondOnOneTable(pInfo->pCondition, pInfo->req.filterTb);
}
......@@ -1573,12 +1581,12 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
}
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
const char* name, SSDataBlock* pBlock) {
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock) {
if (pBlock != NULL) {
if (pInfo->tbnameSlotId != -1) {
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
memcpy(varDataVal(varTbName), name, strlen(name));
varDataSetLen(varTbName, strlen(name));
for (int i = 0; i < pBlock->info.rows; ++i) {
......@@ -1669,7 +1677,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
const char* pUser, SExecTaskInfo* pTaskInfo) {
int32_t code = TDB_CODE_SUCCESS;
int32_t code = TDB_CODE_SUCCESS;
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -1717,10 +1725,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL);
return pOperator;
_error:
_error:
if (pInfo != NULL) {
destroySysScanOperator(pInfo);
}
......@@ -1757,7 +1766,7 @@ void destroySysScanOperator(void* param) {
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0|| pInfo->pCur != NULL) {
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
}
......@@ -2165,7 +2174,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
// make the valgrind happy that all memory buffer has been initialized already.
if (slotId != 0) {
SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0);
int64_t v = 0;
int64_t v = 0;
colDataAppendInt64(p1, 0, &v);
}
......@@ -2175,10 +2184,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
}
static void destroyBlockDistScanOperatorInfo(void* param) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock);
tsdbReaderClose(pDistInfo->pHandle);
taosMemoryFreeClear(param);
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock);
tsdbReaderClose(pDistInfo->pHandle);
taosMemoryFreeClear(param);
}
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
......@@ -2250,8 +2259,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, optrDefaultBufFn, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo,
optrDefaultBufFn, NULL);
return pOperator;
_error:
......
......@@ -3053,14 +3053,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
if (pHandle->currentPage == -1) {
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pPage->num = sizeof(SFilePage);
} else {
pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
......@@ -3068,7 +3066,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
releaseBufPage(pHandle->pBuf, pPage);
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pPage->num = sizeof(SFilePage);
......@@ -3115,7 +3112,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
memcpy(pPage->data + pPos->offset, pBuf, length);
......
......@@ -50,8 +50,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
if (pg == NULL) {
return NULL;
}
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes);
}
......@@ -116,7 +116,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
int32_t *pageId = taosArrayGet(list, 0);
SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId);
if (pPage == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
ASSERT(pPage->num == 1);
......@@ -283,7 +283,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
return NULL;
}
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir);
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir);
if (ret != 0) {
tMemBucketDestroy(pBucket);
return NULL;
......@@ -395,7 +395,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
if (pSlot->info.data == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId);
......@@ -489,8 +489,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
// data in buffer and file are merged together to be processed.
SFilePage *buffer = loadDataFromFilePage(pMemBucket, i);
if (buffer == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
int32_t currentIdx = count - num;
char *thisVal = buffer->data + pMemBucket->bytes * currentIdx;
......@@ -536,7 +537,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
int32_t *pageId = taosArrayGet(list, f);
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
......
......@@ -228,9 +228,14 @@ typedef struct SQWorkerMgmt {
case QW_PHASE_POST_FETCH: \
ctx->inFetch = 0; \
break; \
default: \
case QW_PHASE_PRE_QUERY: \
case QW_PHASE_POST_QUERY: \
case QW_PHASE_PRE_CQUERY: \
case QW_PHASE_POST_CQUERY: \
atomic_store_8(&(ctx)->phase, _value); \
break; \
default: \
break; \
} \
} while (0)
......
......@@ -550,7 +550,9 @@ _return:
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
QW_SET_PHASE(ctx, phase);
if (QW_PHASE_POST_CQUERY != phase) {
QW_SET_PHASE(ctx, phase);
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx);
......@@ -757,7 +759,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_LOCK(QW_WRITE, &ctx->lock);
if (qComplete || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code) {
// Note: query is not running anymore
QW_SET_PHASE(ctx, 0);
QW_SET_PHASE(ctx, QW_PHASE_POST_CQUERY);
QW_UNLOCK(QW_WRITE, &ctx->lock);
break;
}
......
......@@ -207,6 +207,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
if (ppTask) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
/*if (pTask->timer) {
* taosTmrStop(pTask->timer);*/
/*pTask->timer = NULL;*/
......
......@@ -89,63 +89,3 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
return 0;
}
int32_t syncNodeOnAppendEntriesReplyOld(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
syncLogRecvAppendEntriesReply(ths, pMsg, "not in my config");
return 0;
}
// drop stale response
if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0;
}
if (ths->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term > ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return -1;
}
ASSERT(pMsg->term == ths->raftStore.currentTerm);
if (pMsg->success) {
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->matchIndex > oldMatchIndex) {
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
syncMaybeAdvanceCommitIndex(ths);
// maybe update minMatchIndex
ths->minMatchIndex = syncMinMatchIndex(ths);
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex;
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
}
// send next append entries
SPeerState* pState = syncNodeGetPeerState(ths, &(pMsg->srcId));
ASSERT(pState != NULL);
if (pMsg->lastSendIndex == pState->lastSendIndex) {
int64_t timeNow = taosGetTimestampMs();
int64_t elapsed = timeNow - pState->lastSendTime;
sNTrace(ths, "sync-append-entries rtt elapsed:%" PRId64 ", index:%" PRId64, elapsed, pState->lastSendIndex);
syncNodeReplicateOne(ths, &(pMsg->srcId), true);
}
}
syncLogRecvAppendEntriesReply(ths, pMsg, "process");
return 0;
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -41,7 +41,7 @@ target_link_libraries(
)
if(TD_WINDOWS)
target_link_libraries(
os PUBLIC ws2_32 iconv msvcregex wcwidth winmm crashdump dbghelp
os PUBLIC ws2_32 iconv msvcregex wcwidth winmm crashdump dbghelp version
)
elseif(TD_DARWIN_64)
find_library(CORE_FOUNDATION_FRAMEWORK CoreFoundation)
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册