提交 0d2d3349 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/main' into fix/main_bugfix_wxy

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taosadapter # taosadapter
ExternalProject_Add(taosadapter ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG 213f8b3 GIT_TAG 3e08996
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 7d24ed5 GIT_TAG 0cd564a
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
module goexample
go 1.17
require github.com/taosdata/driver-go/v3 3.0
import pandas import pandas
from sqlalchemy import create_engine from sqlalchemy import create_engine, text
engine = create_engine("taos://root:taosdata@localhost:6030/power") engine = create_engine("taos://root:taosdata@localhost:6030/power")
df = pandas.read_sql("SELECT * FROM meters", engine) conn = engine.connect()
df = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
conn.close()
# print index # print index
print(df.index) print(df.index)
......
import pandas import pandas
from sqlalchemy import create_engine from sqlalchemy import create_engine, text
engine = create_engine("taosrest://root:taosdata@localhost:6041") engine = create_engine("taosrest://root:taosdata@localhost:6041")
df: pandas.DataFrame = pandas.read_sql("SELECT * FROM power.meters", engine) conn = engine.connect()
df: pandas.DataFrame = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
conn.close()
# print index # print index
print(df.index) print(df.index)
......
# ANCHOR: connect # ANCHOR: connect
from taosrest import connect, TaosRestConnection, TaosRestCursor from taosrest import connect, TaosRestConnection, TaosRestCursor
conn: TaosRestConnection = connect(url="http://localhost:6041", conn = connect(url="http://localhost:6041",
user="root", user="root",
password="taosdata", password="taosdata",
timeout=30) timeout=30)
# ANCHOR_END: connect # ANCHOR_END: connect
# ANCHOR: basic # ANCHOR: basic
# create STable # create STable
cursor: TaosRestCursor = conn.cursor() cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power") cursor.execute("DROP DATABASE IF EXISTS power")
cursor.execute("CREATE DATABASE power") cursor.execute("CREATE DATABASE power")
cursor.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") cursor.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
# insert data # insert data
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""") power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""")
print("inserted row count:", cursor.rowcount) print("inserted row count:", cursor.rowcount)
# query data # query data
...@@ -28,7 +29,7 @@ print("queried row count:", cursor.rowcount) ...@@ -28,7 +29,7 @@ print("queried row count:", cursor.rowcount)
# get column names from cursor # get column names from cursor
column_names = [meta[0] for meta in cursor.description] column_names = [meta[0] for meta in cursor.description]
# get rows # get rows
data: list[tuple] = cursor.fetchall() data = cursor.fetchall()
print(column_names) print(column_names)
for row in data: for row in data:
print(row) print(row)
......
...@@ -8,7 +8,7 @@ conn.execute("CREATE DATABASE test") ...@@ -8,7 +8,7 @@ conn.execute("CREATE DATABASE test")
# change database. same as execute "USE db" # change database. same as execute "USE db"
conn.select_db("test") conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
affected_row: int = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m 24.4)") affected_row = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
print("affected_row", affected_row) print("affected_row", affected_row)
# output: # output:
# affected_row 3 # affected_row 3
...@@ -16,10 +16,10 @@ print("affected_row", affected_row) ...@@ -16,10 +16,10 @@ print("affected_row", affected_row)
# ANCHOR: query # ANCHOR: query
# Execute a sql and get its result set. It's useful for SELECT statement # Execute a sql and get its result set. It's useful for SELECT statement
result: taos.TaosResult = conn.query("SELECT * from weather") result = conn.query("SELECT * from weather")
# Get fields from result # Get fields from result
fields: taos.field.TaosFields = result.fields fields = result.fields
for field in fields: for field in fields:
print(field) # {name: ts, type: 9, bytes: 8} print(field) # {name: ts, type: 9, bytes: 8}
...@@ -42,4 +42,4 @@ print(data) ...@@ -42,4 +42,4 @@ print(data)
# ANCHOR_END: query # ANCHOR_END: query
conn.close() conn.close()
\ No newline at end of file
# install dependencies: # install dependencies:
# recommend python >= 3.8 # recommend python >= 3.8
# pip3 install faster-fifo
# #
import logging import logging
import math import math
import multiprocessing
import sys import sys
import time import time
import os import os
from multiprocessing import Process from multiprocessing import Process, Queue
from faster_fifo import Queue
from mockdatasource import MockDataSource from mockdatasource import MockDataSource
from queue import Empty from queue import Empty
from typing import List from typing import List
...@@ -22,8 +21,7 @@ TABLE_COUNT = 1000 ...@@ -22,8 +21,7 @@ TABLE_COUNT = 1000
QUEUE_SIZE = 1000000 QUEUE_SIZE = 1000000
MAX_BATCH_SIZE = 3000 MAX_BATCH_SIZE = 3000
read_processes = [] _DONE_MESSAGE = '__DONE__'
write_processes = []
def get_connection(): def get_connection():
...@@ -44,41 +42,64 @@ def get_connection(): ...@@ -44,41 +42,64 @@ def get_connection():
# ANCHOR: read # ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]): def run_read_task(task_id: int, task_queues: List[Queue], infinity):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task) data_source = MockDataSource(f"tb{task_id}", table_count_per_task, infinity)
try: try:
for batch in data_source: for batch in data_source:
if isinstance(batch, tuple):
batch = [batch]
for table_id, rows in batch: for table_id, rows in batch:
# hash data to different queue # hash data to different queue
i = table_id % len(task_queues) i = table_id % len(task_queues)
# block putting forever when the queue is full # block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1) for row in rows:
task_queues[i].put(row)
if not infinity:
for queue in task_queues:
queue.put(_DONE_MESSAGE)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally:
logging.info('read task over')
# ANCHOR_END: read # ANCHOR_END: read
# ANCHOR: write # ANCHOR: write
def run_write_task(task_id: int, queue: Queue): def run_write_task(task_id: int, queue: Queue, done_queue: Queue):
from sql_writer import SQLWriter from sql_writer import SQLWriter
log = logging.getLogger(f"WriteTask-{task_id}") log = logging.getLogger(f"WriteTask-{task_id}")
writer = SQLWriter(get_connection) writer = SQLWriter(get_connection)
lines = None lines = None
try: try:
while True: while True:
try: over = False
# get as many as possible lines = []
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE) for _ in range(MAX_BATCH_SIZE):
try:
line = queue.get_nowait()
if line == _DONE_MESSAGE:
over = True
break
if line:
lines.append(line)
except Empty:
time.sleep(0.1)
if len(lines) > 0:
writer.process_lines(lines) writer.process_lines(lines)
except Empty: if over:
time.sleep(0.01) done_queue.put(_DONE_MESSAGE)
break
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
except BaseException as e: except BaseException as e:
log.debug(f"lines={lines}") log.debug(f"lines={lines}")
raise e raise e
finally:
writer.close()
log.debug('write task over')
# ANCHOR_END: write # ANCHOR_END: write
...@@ -103,47 +124,64 @@ def set_global_config(): ...@@ -103,47 +124,64 @@ def set_global_config():
# ANCHOR: monitor # ANCHOR: monitor
def run_monitor_process(): def run_monitor_process(done_queue: Queue):
log = logging.getLogger("DataBaseMonitor") log = logging.getLogger("DataBaseMonitor")
conn = get_connection() conn = None
conn.execute("DROP DATABASE IF EXISTS test") try:
conn.execute("CREATE DATABASE test") conn = get_connection()
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
def get_count(): def get_count():
res = conn.query("SELECT count(*) FROM test.meters") res = conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all() rows = res.fetch_all()
return rows[0][0] if rows else 0 return rows[0][0] if rows else 0
last_count = 0 last_count = 0
while True: while True:
time.sleep(10) try:
count = get_count() done = done_queue.get_nowait()
log.info(f"count={count} speed={(count - last_count) / 10}") if done == _DONE_MESSAGE:
last_count = count break
except Empty:
pass
time.sleep(10)
count = get_count()
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
finally:
conn.close()
# ANCHOR_END: monitor # ANCHOR_END: monitor
# ANCHOR: main # ANCHOR: main
def main(): def main(infinity):
set_global_config() set_global_config()
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, " logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
monitor_process = Process(target=run_monitor_process) conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE IF NOT EXISTS test")
conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
conn.close()
done_queue = Queue()
monitor_process = Process(target=run_monitor_process, args=(done_queue,))
monitor_process.start() monitor_process.start()
time.sleep(3) # waiting for database ready. logging.debug(f"monitor task started with pid {monitor_process.pid}")
task_queues: List[Queue] = [] task_queues: List[Queue] = []
write_processes = []
read_processes = []
# create task queues # create task queues
for i in range(WRITE_TASK_COUNT): for i in range(WRITE_TASK_COUNT):
queue = Queue(max_size_bytes=QUEUE_SIZE) queue = Queue()
task_queues.append(queue) task_queues.append(queue)
# create write processes # create write processes
for i in range(WRITE_TASK_COUNT): for i in range(WRITE_TASK_COUNT):
p = Process(target=run_write_task, args=(i, task_queues[i])) p = Process(target=run_write_task, args=(i, task_queues[i], done_queue))
p.start() p.start()
logging.debug(f"WriteTask-{i} started with pid {p.pid}") logging.debug(f"WriteTask-{i} started with pid {p.pid}")
write_processes.append(p) write_processes.append(p)
...@@ -151,13 +189,19 @@ def main(): ...@@ -151,13 +189,19 @@ def main():
# create read processes # create read processes
for i in range(READ_TASK_COUNT): for i in range(READ_TASK_COUNT):
queues = assign_queues(i, task_queues) queues = assign_queues(i, task_queues)
p = Process(target=run_read_task, args=(i, queues)) p = Process(target=run_read_task, args=(i, queues, infinity))
p.start() p.start()
logging.debug(f"ReadTask-{i} started with pid {p.pid}") logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes.append(p) read_processes.append(p)
try: try:
monitor_process.join() monitor_process.join()
for p in read_processes:
p.join()
for p in write_processes:
p.join()
time.sleep(1)
return
except KeyboardInterrupt: except KeyboardInterrupt:
monitor_process.terminate() monitor_process.terminate()
[p.terminate() for p in read_processes] [p.terminate() for p in read_processes]
...@@ -176,5 +220,6 @@ def assign_queues(read_task_id, task_queues): ...@@ -176,5 +220,6 @@ def assign_queues(read_task_id, task_queues):
if __name__ == '__main__': if __name__ == '__main__':
main() multiprocessing.set_start_method('spawn')
main(False)
# ANCHOR_END: main # ANCHOR_END: main
...@@ -26,7 +26,8 @@ class Consumer(object): ...@@ -26,7 +26,8 @@ class Consumer(object):
'bath_consume': True, 'bath_consume': True,
'batch_size': 1000, 'batch_size': 1000,
'async_model': True, 'async_model': True,
'workers': 10 'workers': 10,
'testing': False
} }
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose', LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
...@@ -46,11 +47,12 @@ class Consumer(object): ...@@ -46,11 +47,12 @@ class Consumer(object):
def __init__(self, **configs): def __init__(self, **configs):
self.config: dict = self.DEFAULT_CONFIGS self.config: dict = self.DEFAULT_CONFIGS
self.config.update(configs) self.config.update(configs)
self.consumer = KafkaConsumer( if not self.config.get('testing'):
self.config.get('kafka_topic'), # topic self.consumer = KafkaConsumer(
bootstrap_servers=self.config.get('kafka_brokers'), self.config.get('kafka_topic'), # topic
group_id=self.config.get('kafka_group_id'), bootstrap_servers=self.config.get('kafka_brokers'),
) group_id=self.config.get('kafka_group_id'),
)
self.taos = taos.connect( self.taos = taos.connect(
host=self.config.get('taos_host'), host=self.config.get('taos_host'),
user=self.config.get('taos_user'), user=self.config.get('taos_user'),
...@@ -60,7 +62,7 @@ class Consumer(object): ...@@ -60,7 +62,7 @@ class Consumer(object):
) )
if self.config.get('async_model'): if self.config.get('async_model'):
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers')) self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
self.tasks: list[Future] = [] self.tasks = []
# tags and table mapping # key: {location}_{groupId} value: # tags and table mapping # key: {location}_{groupId} value:
self.tag_table_mapping = {} self.tag_table_mapping = {}
i = 0 i = 0
...@@ -104,8 +106,8 @@ class Consumer(object): ...@@ -104,8 +106,8 @@ class Consumer(object):
for task in self.tasks: for task in self.tasks:
while not task.done(): while not task.done():
pass pass
if self.pool is not None: if self.pool is not None:
self.pool.shutdown() self.pool.shutdown()
# clean data # clean data
if self.config.get('clean_after_testing'): if self.config.get('clean_after_testing'):
...@@ -115,14 +117,14 @@ class Consumer(object): ...@@ -115,14 +117,14 @@ class Consumer(object):
if self.taos is not None: if self.taos is not None:
self.taos.close() self.taos.close()
def _run(self, f: Callable[[ConsumerRecord], bool]): def _run(self, f):
for message in self.consumer: for message in self.consumer:
if self.config.get('async_model'): if self.config.get('async_model'):
self.pool.submit(f(message)) self.pool.submit(f(message))
else: else:
f(message) f(message)
def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]): def _run_batch(self, f):
while True: while True:
messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size')) messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
if messages: if messages:
...@@ -140,7 +142,7 @@ class Consumer(object): ...@@ -140,7 +142,7 @@ class Consumer(object):
logging.info('## insert sql %s', sql) logging.info('## insert sql %s', sql)
return self.taos.execute(sql=sql) == 1 return self.taos.execute(sql=sql) == 1
def _to_taos_batch(self, messages: list[list[ConsumerRecord]]): def _to_taos_batch(self, messages):
sql = self._build_sql_batch(messages=messages) sql = self._build_sql_batch(messages=messages)
if len(sql) == 0: # decode error, skip if len(sql) == 0: # decode error, skip
return return
...@@ -162,7 +164,7 @@ class Consumer(object): ...@@ -162,7 +164,7 @@ class Consumer(object):
table_name = self._get_table_name(location=location, group_id=group_id) table_name = self._get_table_name(location=location, group_id=group_id)
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase) return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str: def _build_sql_batch(self, messages) -> str:
sql_list = [] sql_list = []
for partition_messages in messages: for partition_messages in messages:
for message in partition_messages: for message in partition_messages:
...@@ -186,7 +188,55 @@ def _get_location_and_group(key: str) -> (str, int): ...@@ -186,7 +188,55 @@ def _get_location_and_group(key: str) -> (str, int):
return fields[0], fields[1] return fields[0], fields[1]
def test_to_taos(consumer: Consumer):
msg = {
'location': 'California.SanFrancisco',
'groupId': 1,
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027,
}
record = ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps(msg), partition=1,
topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None)
assert consumer._to_taos(message=record)
def test_to_taos_batch(consumer: Consumer):
records = [
[
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'location': 'California.SanFrancisco',
'groupId': 1,
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'location': 'California.LosAngles',
'groupId': 2,
'ts': '2022-12-06 15:13:39.643',
'current': 3.41,
'voltage': 102,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
]
]
consumer._to_taos_batch(messages=records)
if __name__ == '__main__': if __name__ == '__main__':
consumer = Consumer(async_model=True) consumer = Consumer(async_model=True, testing=True)
# init env
consumer.init_env() consumer.init_env()
consumer.consume() # consumer.consume()
\ No newline at end of file # test build sql
# test build sql batch
test_to_taos(consumer)
test_to_taos_batch(consumer)
\ No newline at end of file
...@@ -10,13 +10,14 @@ class MockDataSource: ...@@ -10,13 +10,14 @@ class MockDataSource:
"9.4,118,0.141,California.SanFrancisco,4" "9.4,118,0.141,California.SanFrancisco,4"
] ]
def __init__(self, tb_name_prefix, table_count): def __init__(self, tb_name_prefix, table_count, infinity=True):
self.table_name_prefix = tb_name_prefix + "_" self.table_name_prefix = tb_name_prefix + "_"
self.table_count = table_count self.table_count = table_count
self.max_rows = 10000000 self.max_rows = 10000000
self.current_ts = round(time.time() * 1000) - self.max_rows * 100 self.current_ts = round(time.time() * 1000) - self.max_rows * 100
# [(tableId, tableName, values),] # [(tableId, tableName, values),]
self.data = self._init_data() self.data = self._init_data()
self.infinity = infinity
def _init_data(self): def _init_data(self):
lines = self.samples * (self.table_count // 5 + 1) lines = self.samples * (self.table_count // 5 + 1)
...@@ -28,14 +29,19 @@ class MockDataSource: ...@@ -28,14 +29,19 @@ class MockDataSource:
def __iter__(self): def __iter__(self):
self.row = 0 self.row = 0
return self if not self.infinity:
return iter(self._iter_data())
else:
return self
def __next__(self): def __next__(self):
""" """
next 1000 rows for each table. next 1000 rows for each table.
return: {tableId:[row,...]} return: {tableId:[row,...]}
""" """
# generate 1000 timestamps return self._iter_data()
def _iter_data(self):
ts = [] ts = []
for _ in range(1000): for _ in range(1000):
self.current_ts += 100 self.current_ts += 100
...@@ -47,3 +53,10 @@ class MockDataSource: ...@@ -47,3 +53,10 @@ class MockDataSource:
rows = [table_name + ',' + t + ',' + values for t in ts] rows = [table_name + ',' + t + ',' + values for t in ts]
result.append((table_id, rows)) result.append((table_id, rows))
return result return result
if __name__ == '__main__':
datasource = MockDataSource('t', 10, False)
for data in datasource:
print(data)
\ No newline at end of file
...@@ -10,6 +10,7 @@ class SQLWriter: ...@@ -10,6 +10,7 @@ class SQLWriter:
self._tb_tags = {} self._tb_tags = {}
self._conn = get_connection_func() self._conn = get_connection_func()
self._max_sql_length = self.get_max_sql_length() self._max_sql_length = self.get_max_sql_length()
self._conn.execute("create database if not exists test")
self._conn.execute("USE test") self._conn.execute("USE test")
def get_max_sql_length(self): def get_max_sql_length(self):
...@@ -20,7 +21,7 @@ class SQLWriter: ...@@ -20,7 +21,7 @@ class SQLWriter:
return int(r[1]) return int(r[1])
return 1024 * 1024 return 1024 * 1024
def process_lines(self, lines: str): def process_lines(self, lines: [str]):
""" """
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]] :param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
""" """
...@@ -60,6 +61,7 @@ class SQLWriter: ...@@ -60,6 +61,7 @@ class SQLWriter:
buf.append(q) buf.append(q)
sql_len += len(q) sql_len += len(q)
sql += " ".join(buf) sql += " ".join(buf)
self.create_tables()
self.execute_sql(sql) self.execute_sql(sql)
self._tb_values.clear() self._tb_values.clear()
...@@ -88,3 +90,23 @@ class SQLWriter: ...@@ -88,3 +90,23 @@ class SQLWriter:
except BaseException as e: except BaseException as e:
self.log.error("Execute SQL: %s", sql) self.log.error("Execute SQL: %s", sql)
raise e raise e
def close(self):
if self._conn:
self._conn.close()
if __name__ == '__main__':
def get_connection_func():
conn = taos.connect()
return conn
writer = SQLWriter(get_connection_func=get_connection_func)
writer.execute_sql(
"create stable if not exists meters (ts timestamp, current float, voltage int, phase float) "
"tags (location binary(64), groupId int)")
writer.execute_sql(
"INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) "
"VALUES ('2021-07-13 14:06:32.272', 10.2, 219, 0.32)")
\ No newline at end of file
from taos.tmq import Consumer
import taos import taos
from taos.tmq import *
conn = taos.connect()
print("init") def init_tmq_env(db, topic):
conn.execute("drop topic if exists topic_ctb_column") conn = taos.connect()
conn.execute("drop database if exists py_tmq") conn.execute("drop topic if exists {}".format(topic))
conn.execute("create database if not exists py_tmq vgroups 2") conn.execute("drop database if exists {}".format(db))
conn.select_db("py_tmq") conn.execute("create database if not exists {}".format(db))
conn.execute( conn.select_db(db)
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)" conn.execute(
) "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
conn.execute("create table if not exists tb1 using stb1 tags(1)") conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
conn.execute("create table if not exists tb2 using stb1 tags(2)") conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
conn.execute("create table if not exists tb3 using stb1 tags(3)") conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
print("create topic") conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')")
conn.execute( conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')")
"create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1" conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
)
print("build consumer") def cleanup(db, topic):
conf = TaosTmqConf() conn = taos.connect()
conf.set("group.id", "tg2") conn.execute("drop topic if exists {}".format(topic))
conf.set("td.connect.user", "root") conn.execute("drop database if exists {}".format(db))
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
if __name__ == '__main__':
init_tmq_env("tmq_test", "tmq_test_topic") # init env
def tmq_commit_cb_print(tmq, resp, offset, param=None): consumer = Consumer(
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}") {
"group.id": "tg2",
"td.connect.user": "root",
conf.set_auto_commit_cb(tmq_commit_cb_print, None) "td.connect.pass": "taosdata",
tmq = conf.new_consumer() "enable.auto.commit": "true",
}
print("build topic list") )
consumer.subscribe(["tmq_test_topic"])
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column") try:
while True:
print("basic consume loop") res = consumer.poll(1)
tmq.subscribe(topic_list) if not res:
break
sub_list = tmq.subscription() err = res.error()
if err is not None:
print("subscribed topics: ", sub_list) raise err
val = res.value()
while 1:
res = tmq.poll(1000) for block in val:
if res: print(block.fetchall())
topic = res.get_topic_name() finally:
vg = res.get_vgroup_id() consumer.unsubscribe()
db = res.get_db_name() consumer.close()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}") cleanup("tmq_test", "tmq_test_topic")
for row in res: \ No newline at end of file
print(row)
...@@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); ...@@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
SWalRef *walRefFirstVer(SWal *, SWalRef *);
SWalRef *walRefCommittedVer(SWal *); SWalRef *walRefCommittedVer(SWal *);
SWalRef *walOpenRef(SWal *); SWalRef *walOpenRef(SWal *);
......
...@@ -210,8 +210,8 @@ function install_bin() { ...@@ -210,8 +210,8 @@ function install_bin() {
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || : [ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || :
[ -x ${install_main_dir}/bin/${udfdName} ] && ${csudo}ln -s ${install_main_dir}/bin/${udfdName} ${bin_link_dir}/${udfdName} || : [ -x ${install_main_dir}/bin/${udfdName} ] && ${csudo}ln -s ${install_main_dir}/bin/${udfdName} ${bin_link_dir}/${udfdName} || :
[ -x ${install_main_dir}/bin/${adapterName} ] && ${csudo}ln -s ${install_main_dir}/bin/${adapterName} ${bin_link_dir}/${adapterName} || : [ -x ${install_main_dir}/bin/${adapterName} ] && ${csudo}ln -s ${install_main_dir}/bin/${adapterName} ${bin_link_dir}/${adapterName} || :
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${demoName} || : [ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${demoName} || :
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || : [ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || :
[ -x ${install_main_dir}/bin/${dumpName} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName} || : [ -x ${install_main_dir}/bin/${dumpName} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName} || :
[ -x ${install_main_dir}/bin/${xname} ] && ${csudo}ln -s ${install_main_dir}/bin/${xname} ${bin_link_dir}/${xname} || : [ -x ${install_main_dir}/bin/${xname} ] && ${csudo}ln -s ${install_main_dir}/bin/${xname} ${bin_link_dir}/${xname} || :
[ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -s ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || : [ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -s ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || :
...@@ -746,7 +746,7 @@ function is_version_compatible() { ...@@ -746,7 +746,7 @@ function is_version_compatible() {
deb_erase() { deb_erase() {
confirm="" confirm=""
while [ "" == "${confirm}" ]; do while [ "" == "${confirm}" ]; do
echo -e -n "${RED}Exist tdengine deb detected, do you want to remove it? [yes|no] ${NC}:" echo -e -n "${RED}Existing TDengine deb is detected, do you want to remove it? [yes|no] ${NC}:"
read confirm read confirm
if [ "yes" == "$confirm" ]; then if [ "yes" == "$confirm" ]; then
${csudo}dpkg --remove tdengine ||: ${csudo}dpkg --remove tdengine ||:
...@@ -760,7 +760,7 @@ deb_erase() { ...@@ -760,7 +760,7 @@ deb_erase() {
rpm_erase() { rpm_erase() {
confirm="" confirm=""
while [ "" == "${confirm}" ]; do while [ "" == "${confirm}" ]; do
echo -e -n "${RED}Exist tdengine rpm detected, do you want to remove it? [yes|no] ${NC}:" echo -e -n "${RED}Existing TDengine rpm is detected, do you want to remove it? [yes|no] ${NC}:"
read confirm read confirm
if [ "yes" == "$confirm" ]; then if [ "yes" == "$confirm" ]; then
${csudo}rpm -e tdengine ||: ${csudo}rpm -e tdengine ||:
...@@ -787,7 +787,7 @@ function updateProduct() { ...@@ -787,7 +787,7 @@ function updateProduct() {
if echo $osinfo | grep -qwi "centos"; then if echo $osinfo | grep -qwi "centos"; then
rpm -q tdengine 2>&1 > /dev/null && rpm_erase tdengine ||: rpm -q tdengine 2>&1 > /dev/null && rpm_erase tdengine ||:
elif echo $osinfo | grep -qwi "ubuntu"; then elif echo $osinfo | grep -qwi "ubuntu"; then
dpkg -l tdengine 2>&1 > /dev/null && deb_erase tdengine ||: dpkg -l tdengine 2>&1 | grep ii > /dev/null && deb_erase tdengine ||:
fi fi
tar -zxf ${tarName} tar -zxf ${tarName}
......
...@@ -79,8 +79,6 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { ...@@ -79,8 +79,6 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN] = {0}; char path[TSDB_FILENAME_LEN] = {0};
vnodeProposeCommitOnNeed(pVnode->pImpl);
taosThreadRwlockWrlock(&pMgmt->lock); taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosThreadRwlockUnlock(&pMgmt->lock); taosThreadRwlockUnlock(&pMgmt->lock);
......
...@@ -202,6 +202,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol ...@@ -202,6 +202,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
uint8_t **ppBuf); uint8_t **ppBuf);
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
uint8_t **ppBuf); uint8_t **ppBuf);
int32_t tRowInfoCmprFn(const void *p1, const void *p2);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
......
...@@ -247,7 +247,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader); ...@@ -247,7 +247,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ======================================== // STsdbSnapWriter ========================================
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter); int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// STqSnapshotReader == // STqSnapshotReader ==
......
...@@ -423,10 +423,10 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -423,10 +423,10 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
// rsma1/rsma2 // rsma1/rsma2
if (pHdr->type == SNAP_DATA_RSMA1) { if (pHdr->type == SNAP_DATA_RSMA1) {
pHdr->type = SNAP_DATA_TSDB; pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[0], pData, nData); code = tsdbSnapWrite(pWriter->pDataWriter[0], pHdr);
} else if (pHdr->type == SNAP_DATA_RSMA2) { } else if (pHdr->type == SNAP_DATA_RSMA2) {
pHdr->type = SNAP_DATA_TSDB; pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData); code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr);
} else if (pHdr->type == SNAP_DATA_QTASK) { } else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
} else { } else {
......
...@@ -521,7 +521,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -521,7 +521,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqOffsetResetToData(&fetchOffsetNew, 0, 0); tqOffsetResetToData(&fetchOffsetNew, 0, 0);
} }
} else { } else {
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
if (pHandle->pRef == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1);
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
...@@ -719,6 +724,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -719,6 +724,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey);
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->pushLock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) { if (code != 0) {
......
...@@ -684,7 +684,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo ...@@ -684,7 +684,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData; uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL; pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData); code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit; if (code) goto _exit;
...@@ -757,7 +757,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -757,7 +757,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
pTColVal->value.nData = pColVal->value.nData; pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) { if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
} }
pTColVal->flag = 0; pTColVal->flag = 0;
} else { } else {
...@@ -776,7 +776,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -776,7 +776,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
code = tRealloc(&tColVal->value.pData, pColVal->value.nData); code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
if (code) return code; if (code) return code;
tColVal->value.nData = pColVal->value.nData; tColVal->value.nData = pColVal->value.nData;
if (pColVal->value.nData) { if (pColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData); memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
} }
...@@ -825,7 +825,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -825,7 +825,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData; uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL; pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData); code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit; if (code) goto _exit;
...@@ -834,7 +834,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -834,7 +834,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData); memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
} }
} }
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
...@@ -845,7 +845,7 @@ _exit: ...@@ -845,7 +845,7 @@ _exit:
return code; return code;
} }
void tRowMergerClear(SRowMerger *pMerger) { void tRowMergerClear(SRowMerger *pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) { if (IS_VAR_DATA_TYPE(pTColVal->type)) {
...@@ -853,7 +853,7 @@ void tRowMergerClear(SRowMerger *pMerger) { ...@@ -853,7 +853,7 @@ void tRowMergerClear(SRowMerger *pMerger) {
} }
} }
taosArrayDestroy(pMerger->pArray); taosArrayDestroy(pMerger->pArray);
} }
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
...@@ -876,7 +876,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { ...@@ -876,7 +876,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
pTColVal->value.nData = pColVal->value.nData; pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) { if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
} }
pTColVal->flag = 0; pTColVal->flag = 0;
} else { } else {
...@@ -898,7 +898,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { ...@@ -898,7 +898,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
tColVal->value.nData = pColVal->value.nData; tColVal->value.nData = pColVal->value.nData;
if (tColVal->value.nData) { if (tColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData); memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData);
} }
tColVal->flag = 0; tColVal->flag = 0;
} else { } else {
......
...@@ -455,7 +455,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ...@@ -455,7 +455,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
if (code) goto _err; if (code) goto _err;
} }
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
if (code) goto _err; if (code) goto _err;
} break; } break;
case SNAP_DATA_TQ_HANDLE: { case SNAP_DATA_TQ_HANDLE: {
......
...@@ -2477,7 +2477,19 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -2477,7 +2477,19 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
pInfo->delKey = key; pInfo->delKey = key;
} }
int32_t prevEndPos = (forwardRows - 1) * step + startPos; int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
",maxKey %" PRId64,
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
blockDataUpdateTsWindow(pSDataBlock, 0);
// timestamp of the data is incorrect
if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64,
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
}
}
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos);
} else { } else {
......
...@@ -194,6 +194,8 @@ typedef struct SQWorker { ...@@ -194,6 +194,8 @@ typedef struct SQWorker {
SMsgCb msgCb; SMsgCb msgCb;
SQWStat stat; SQWStat stat;
int32_t *destroyed; int32_t *destroyed;
int8_t nodeStopped;
} SQWorker; } SQWorker;
typedef struct SQWorkerMgmt { typedef struct SQWorkerMgmt {
......
...@@ -213,9 +213,15 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { ...@@ -213,9 +213,15 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
QW_SET_QTID(id, qId, tId, eId); QW_SET_QTID(id, qId, tId, eId);
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped);
if (NULL == (*ctx)) { if (NULL == (*ctx)) {
QW_TASK_DLOG_E("task ctx not exist, may be dropped"); if (!nodeStopped) {
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); QW_TASK_DLOG_E("task ctx not exist, may be dropped");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
} else {
QW_TASK_DLOG_E("node stopped");
QW_ERR_RET(TSDB_CODE_VND_STOPPED);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -226,9 +232,16 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { ...@@ -226,9 +232,16 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
QW_SET_QTID(id, qId, tId, eId); QW_SET_QTID(id, qId, tId, eId);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped);
if (NULL == (*ctx)) { if (NULL == (*ctx)) {
QW_TASK_DLOG_E("task ctx not exist, may be dropped"); if (!nodeStopped) {
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); QW_TASK_DLOG_E("task ctx not exist, may be dropped");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
} else {
QW_TASK_DLOG_E("node stopped");
QW_ERR_RET(TSDB_CODE_VND_STOPPED);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -1188,6 +1188,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { ...@@ -1188,6 +1188,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
uint64_t qId, tId, sId; uint64_t qId, tId, sId;
int32_t eId; int32_t eId;
int64_t rId = 0; int64_t rId = 0;
atomic_store_8(&mgmt->nodeStopped, 1);
void *pIter = taosHashIterate(mgmt->ctxHash, NULL); void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) { while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
......
...@@ -207,6 +207,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -207,6 +207,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
if (ppTask) { if (ppTask) {
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
/*if (pTask->timer) { /*if (pTask->timer) {
* taosTmrStop(pTask->timer);*/ * taosTmrStop(pTask->timer);*/
/*pTask->timer = NULL;*/ /*pTask->timer = NULL;*/
......
...@@ -89,45 +89,6 @@ ...@@ -89,45 +89,6 @@
// /\ UNCHANGED <<candidateVars, leaderVars>> // /\ UNCHANGED <<candidateVars, leaderVars>>
// //
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
ASSERT(false && "deprecated");
if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
sNTrace(ths, "can not do follower commit");
return -1;
}
// maybe update commit index, leader notice me
if (newCommitIndex > ths->commitIndex) {
// has commit entry in local
if (newCommitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin, commitEnd);
}
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
// update commit index
ths->commitIndex = newCommitIndex;
// call back Wal
int32_t code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
code = syncNodeDoCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
}
}
return 0;
}
SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) { SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) {
SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen); SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
if (pEntry == NULL) { if (pEntry == NULL) {
...@@ -232,256 +193,3 @@ _IGNORE: ...@@ -232,256 +193,3 @@ _IGNORE:
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
return 0; return 0;
} }
int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont;
SRpcMsg rpcRsp = {0};
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
syncLogRecvAppendEntries(ths, pMsg, "not in my config");
goto _IGNORE;
}
// prepare response msg
int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
if (code != 0) {
syncLogRecvAppendEntries(ths, pMsg, "build rsp error");
goto _IGNORE;
}
SyncAppendEntriesReply* pReply = rpcRsp.pCont;
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->raftStore.currentTerm;
pReply->success = false;
// pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->lastSendIndex = pMsg->prevLogIndex + 1;
pReply->startTime = ths->startTime;
if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvAppendEntries(ths, pMsg, "reject, small term");
goto _SEND_RESPONSE;
}
if (pMsg->term > ths->raftStore.currentTerm) {
pReply->term = pMsg->term;
}
syncNodeStepDown(ths, pMsg->term);
syncNodeResetElectTimer(ths);
SyncIndex startIndex = ths->pLogStore->syncLogBeginIndex(ths->pLogStore);
SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
if (pMsg->prevLogIndex > lastIndex) {
syncLogRecvAppendEntries(ths, pMsg, "reject, index not match");
goto _SEND_RESPONSE;
}
if (pMsg->prevLogIndex >= startIndex) {
SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1);
// ASSERT(myPreLogTerm != SYNC_TERM_INVALID);
if (myPreLogTerm == SYNC_TERM_INVALID) {
syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term invalid");
goto _SEND_RESPONSE;
}
if (myPreLogTerm != pMsg->prevLogTerm) {
syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match");
goto _SEND_RESPONSE;
}
}
// accept
pReply->success = true;
bool hasAppendEntries = pMsg->dataLen > 0;
if (hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryBuildFromAppendEntries(pMsg);
ASSERT(pAppendEntry != NULL);
SyncIndex appendIndex = pMsg->prevLogIndex + 1;
LRUHandle* hLocal = NULL;
LRUHandle* hAppend = NULL;
int32_t code = 0;
SSyncRaftEntry* pLocalEntry = NULL;
SLRUCache* pCache = ths->pLogStore->pCache;
hLocal = taosLRUCacheLookup(pCache, &appendIndex, sizeof(appendIndex));
if (hLocal) {
pLocalEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, hLocal);
code = 0;
ths->pLogStore->cacheHit++;
sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", appendIndex, pLocalEntry->bytes, pLocalEntry);
} else {
ths->pLogStore->cacheMiss++;
sNTrace(ths, "miss cache index:%" PRId64, appendIndex);
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
}
if (code == 0) {
// get local entry success
if (pLocalEntry->term == pAppendEntry->term) {
// do nothing
sNTrace(ths, "log match, do nothing, index:%" PRId64, appendIndex);
} else {
// truncate
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, truncate error, append-index:%" PRId64, appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
syncEntryDestroy(pLocalEntry);
}
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
syncEntryDestroy(pAppendEntry);
}
goto _IGNORE;
}
ASSERT(pAppendEntry->index == appendIndex);
// append
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
syncEntryDestroy(pLocalEntry);
}
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
syncEntryDestroy(pAppendEntry);
}
goto _IGNORE;
}
syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend);
}
} else {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
// log not exist
// truncate
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, truncate error, append-index:%" PRId64, appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
syncEntryDestroy(pLocalEntry);
syncEntryDestroy(pAppendEntry);
goto _IGNORE;
}
// append
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
syncEntryDestroy(pLocalEntry);
}
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
syncEntryDestroy(pAppendEntry);
}
goto _IGNORE;
}
syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend);
} else {
// get local entry success
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, get local entry error, append-index:%" PRId64 " err:%d", appendIndex,
terrno);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
syncEntryDestroy(pLocalEntry);
}
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
syncEntryDestroy(pAppendEntry);
}
goto _IGNORE;
}
}
// update match index
pReply->matchIndex = pAppendEntry->index;
if (hLocal) {
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
} else {
syncEntryDestroy(pLocalEntry);
}
if (hAppend) {
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
} else {
syncEntryDestroy(pAppendEntry);
}
} else {
// no append entries, do nothing
// maybe has extra entries, no harm
// update match index
pReply->matchIndex = pMsg->prevLogIndex;
}
// maybe update commit index, leader notice me
syncNodeFollowerCommit(ths, pMsg->commitIndex);
syncLogRecvAppendEntries(ths, pMsg, "accept");
goto _SEND_RESPONSE;
_IGNORE:
rpcFreeCont(rpcRsp.pCont);
return 0;
_SEND_RESPONSE:
// msg event log
syncLogSendAppendEntriesReply(ths, pReply, "");
// send response
syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp);
return 0;
}
...@@ -89,63 +89,3 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -89,63 +89,3 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
return 0; return 0;
} }
int32_t syncNodeOnAppendEntriesReplyOld(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
syncLogRecvAppendEntriesReply(ths, pMsg, "not in my config");
return 0;
}
// drop stale response
if (pMsg->term < ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0;
}
if (ths->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term > ths->raftStore.currentTerm) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return -1;
}
ASSERT(pMsg->term == ths->raftStore.currentTerm);
if (pMsg->success) {
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->matchIndex > oldMatchIndex) {
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
syncMaybeAdvanceCommitIndex(ths);
// maybe update minMatchIndex
ths->minMatchIndex = syncMinMatchIndex(ths);
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex;
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
}
// send next append entries
SPeerState* pState = syncNodeGetPeerState(ths, &(pMsg->srcId));
ASSERT(pState != NULL);
if (pMsg->lastSendIndex == pState->lastSendIndex) {
int64_t timeNow = taosGetTimestampMs();
int64_t elapsed = timeNow - pState->lastSendTime;
sNTrace(ths, "sync-append-entries rtt elapsed:%" PRId64 ", index:%" PRId64, elapsed, pState->lastSendIndex);
syncNodeReplicateOne(ths, &(pMsg->srcId), true);
}
}
syncLogRecvAppendEntriesReply(ths, pMsg, "process");
return 0;
}
...@@ -43,148 +43,6 @@ ...@@ -43,148 +43,6 @@
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>> // /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
// //
void syncOneReplicaAdvance(SSyncNode* pSyncNode) {
ASSERT(false && "deprecated");
if (pSyncNode == NULL) {
sError("pSyncNode is NULL");
return;
}
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
sNError(pSyncNode, "not leader, can not advance commit index");
return;
}
if (pSyncNode->replicaNum != 1) {
sNError(pSyncNode, "not one replica, can not advance commit index");
return;
}
// advance commit index to snapshot first
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) {
SyncIndex commitBegin = pSyncNode->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex;
sNTrace(pSyncNode, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin, commitEnd);
}
// advance commit index as large as possible
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
if (lastIndex > pSyncNode->commitIndex) {
sNTrace(pSyncNode, "commit by wal from index:%" PRId64 " to index:%" PRId64, pSyncNode->commitIndex + 1, lastIndex);
pSyncNode->commitIndex = lastIndex;
}
// call back Wal
SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore);
if (pSyncNode->commitIndex > walCommitVer) {
pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex);
}
}
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
ASSERTS(false, "deprecated");
if (pSyncNode == NULL) {
sError("pSyncNode is NULL");
return;
}
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
sNError(pSyncNode, "not leader, can not advance commit index");
return;
}
// advance commit index to sanpshot first
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) {
SyncIndex commitBegin = pSyncNode->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex;
sNTrace(pSyncNode, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin, commitEnd);
}
// update commit index
SyncIndex newCommitIndex = pSyncNode->commitIndex;
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
bool agree = syncAgree(pSyncNode, index);
if (agree) {
// term
SSyncRaftEntry* pEntry = NULL;
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
pSyncNode->pLogStore->cacheHit++;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", index, pEntry->bytes, pEntry);
} else {
pSyncNode->pLogStore->cacheMiss++;
sNTrace(pSyncNode, "miss cache index:%" PRId64, index);
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
if (code != 0) {
sNError(pSyncNode, "advance commit index error, read wal index:%" PRId64, index);
return;
}
}
// cannot commit, even if quorum agree. need check term!
if (pEntry->term <= pSyncNode->raftStore.currentTerm) {
// update commit index
newCommitIndex = index;
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
break;
} else {
sNTrace(pSyncNode, "can not commit due to term not equal, index:%" PRId64 ", term:%" PRIu64, pEntry->index,
pEntry->term);
}
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
}
}
// advance commit index as large as possible
SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore);
if (walCommitVer > newCommitIndex) {
newCommitIndex = walCommitVer;
}
// maybe execute fsm
if (newCommitIndex > pSyncNode->commitIndex) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
// update commit index
pSyncNode->commitIndex = newCommitIndex;
// call back Wal
pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex);
// execute fsm
if (pSyncNode != NULL && pSyncNode->pFsm != NULL) {
int32_t code = syncNodeDoCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state);
if (code != 0) {
sNError(pSyncNode, "advance commit index error, do commit begin:%" PRId64 ", end:%" PRId64, beginIndex,
endIndex);
return;
}
}
}
}
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
// I am leader, I agree // I am leader, I agree
...@@ -210,83 +68,7 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { ...@@ -210,83 +68,7 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
return c; return c;
} }
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { return pSyncNode->quorum; }
return pSyncNode->quorum;
#if 0
int32_t quorum = 1; // self
int64_t timeNow = taosGetTimestampMs();
for (int i = 0; i < pSyncNode->peersNum; ++i) {
int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
int64_t recvTimeDiff = TABS(peerRecvTime - timeNow);
int64_t startTimeDiff = TABS(peerStartTime - pSyncNode->startTime);
int64_t logDiff = TABS(peerMatchIndex - syncNodeGetLastIndex(pSyncNode));
/*
int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow);
int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime);
int64_t logDiff = syncNodeAbs64(peerMatchIndex, syncNodeGetLastIndex(pSyncNode));
*/
int32_t addQuorum = 0;
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
if (startTimeDiff < SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 1;
} else {
if (logDiff < SYNC_ADD_QUORUM_COUNT) {
addQuorum = 1;
} else {
addQuorum = 0;
}
}
} else {
addQuorum = 0;
}
/*
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
addQuorum = 1;
} else {
addQuorum = 0;
}
if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 0;
}
*/
quorum += addQuorum;
}
ASSERT(quorum <= pSyncNode->replicaNum);
if (quorum < pSyncNode->quorum) {
quorum = pSyncNode->quorum;
}
return quorum;
#endif
}
/*
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
int agreeCount = 0;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) {
++agreeCount;
}
if (agreeCount >= syncNodeDynamicQuorum(pSyncNode)) {
return true;
}
}
return false;
}
*/
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) { bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
int count = 0; int count = 0;
......
...@@ -43,7 +43,10 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { ...@@ -43,7 +43,10 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
for (int i = 0; i < pNode->peersNum; ++i) { for (int i = 0; i < pNode->peersNum; ++i) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
ret = syncBuildRequestVote(&rpcMsg, pNode->vgId); ret = syncBuildRequestVote(&rpcMsg, pNode->vgId);
ASSERT(ret == 0); if (ret < 0) {
sError("vgId:%d, failed to build request-vote msg since %s", pNode->vgId, terrstr());
continue;
}
SyncRequestVote* pMsg = rpcMsg.pCont; SyncRequestVote* pMsg = rpcMsg.pCont;
pMsg->srcId = pNode->myRaftId; pMsg->srcId = pNode->myRaftId;
...@@ -51,13 +54,18 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { ...@@ -51,13 +54,18 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
pMsg->term = pNode->raftStore.currentTerm; pMsg->term = pNode->raftStore.currentTerm;
ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm); ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm);
ASSERT(ret == 0); if (ret < 0) {
sError("vgId:%d, failed to get index and term of last log since %s", pNode->vgId, terrstr());
continue;
}
ret = syncNodeSendMsgById(&pNode->peersId[i], pNode, &rpcMsg); ret = syncNodeSendMsgById(&pNode->peersId[i], pNode, &rpcMsg);
ASSERT(ret == 0); if (ret < 0) {
sError("vgId:%d, failed to send msg to peerId:%" PRId64, pNode->vgId, pNode->peersId[i].addr);
continue;
}
} }
return 0;
return ret;
} }
int32_t syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) {
......
...@@ -292,8 +292,6 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { ...@@ -292,8 +292,6 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
goto _DEL_WAL; goto _DEL_WAL;
} else { } else {
lastApplyIndex -= SYNC_VNODE_LOG_RETENTION;
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore); bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
...@@ -308,6 +306,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { ...@@ -308,6 +306,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
if (pSyncNode->replicaNum > 1) { if (pSyncNode->replicaNum > 1) {
// multi replicas // multi replicas
lastApplyIndex = TMAX(lastApplyIndex - SYNC_VNODE_LOG_RETENTION, beginIndex - 1);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
...@@ -586,78 +586,6 @@ SSyncState syncGetState(int64_t rid) { ...@@ -586,78 +586,6 @@ SSyncState syncGetState(int64_t rid) {
return state; return state;
} }
#if 0
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
if (index < SYNC_INDEX_BEGIN) {
return -1;
}
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
if (code != 0) {
if (pEntry != NULL) {
syncEntryDestroy(pEntry);
}
syncNodeRelease(pSyncNode);
return -1;
}
ASSERT(pEntry != NULL);
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = index;
pSnapshot->lastApplyTerm = pEntry->term;
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
syncEntryDestroy(pEntry);
syncNodeRelease(pSyncNode);
return 0;
}
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
sMeta->lastConfigIndex = pSyncNode->raftCfg.lastConfigIndex;
sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->raftCfg.lastConfigIndex);
syncNodeRelease(pSyncNode);
return 0;
}
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
(pSyncNode->raftCfg.configIndexArr)[i] <= snapshotIndex) {
lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
}
}
sMeta->lastConfigIndex = lastIndex;
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
sMeta->lastConfigIndex);
syncNodeRelease(pSyncNode);
return 0;
}
#endif
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) { SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
ASSERT(pSyncNode->raftCfg.configIndexCount >= 1); ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0]; SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
...@@ -1031,9 +959,12 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -1031,9 +959,12 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->commitIndex = commitIndex; pSyncNode->commitIndex = commitIndex;
sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
// restore log store on need
if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) { if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr());
goto _error; goto _error;
} }
// timer ms init // timer ms init
pSyncNode->pingBaseLine = PING_TIMER_MS; pSyncNode->pingBaseLine = PING_TIMER_MS;
pSyncNode->electBaseLine = tsElectInterval; pSyncNode->electBaseLine = tsElectInterval;
...@@ -1096,10 +1027,16 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -1096,10 +1027,16 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->changing = false; pSyncNode->changing = false;
// replication mgr // replication mgr
syncNodeLogReplMgrInit(pSyncNode); if (syncNodeLogReplMgrInit(pSyncNode) < 0) {
sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
goto _error;
}
// peer state // peer state
syncNodePeerStateInit(pSyncNode); if (syncNodePeerStateInit(pSyncNode) < 0) {
sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr());
goto _error;
}
// //
// min match index // min match index
...@@ -1194,27 +1131,10 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) { ...@@ -1194,27 +1131,10 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode); ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0); if (ret != 0) {
return ret; sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
}
void syncNodeStartOld(SSyncNode* pSyncNode) {
// start raft
if (pSyncNode->replicaNum == 1) {
raftStoreNextTerm(pSyncNode);
syncNodeBecomeLeader(pSyncNode, "one replica start");
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode);
} else {
syncNodeBecomeFollower(pSyncNode, "first start");
} }
return ret;
int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0);
} }
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) { int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
...@@ -1225,11 +1145,16 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) { ...@@ -1225,11 +1145,16 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
// reset elect timer, long enough // reset elect timer, long enough
int32_t electMS = TIMER_MAX_MS; int32_t electMS = TIMER_MAX_MS;
int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
ASSERT(ret == 0); if (ret < 0) {
sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
return -1;
}
ret = 0;
ret = syncNodeStartPingTimer(pSyncNode); ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0); if (ret < 0) {
sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
return -1;
}
return ret; return ret;
} }
...@@ -1818,12 +1743,6 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1818,12 +1743,6 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->leaderCache = pSyncNode->myRaftId; pSyncNode->leaderCache = pSyncNode->myRaftId;
for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) { for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
// maybe overwrite myself, no harm
// just do it!
// pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
// maybe wal is deleted
SyncIndex lastIndex; SyncIndex lastIndex;
SyncTerm lastTerm; SyncTerm lastTerm;
int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm); int32_t code = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
...@@ -1885,7 +1804,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1885,7 +1804,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted)); bool granted = voteGrantedMajority(pSyncNode->pVotesGranted);
if (!granted) {
sError("vgId:%d, not granted by majority.", pSyncNode->vgId);
return;
}
syncNodeBecomeLeader(pSyncNode, "candidate to leader"); syncNodeBecomeLeader(pSyncNode, "candidate to leader");
sNTrace(pSyncNode, "state change syncNodeCandidate2Leader"); sNTrace(pSyncNode, "state change syncNodeCandidate2Leader");
...@@ -1901,20 +1824,6 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { ...@@ -1901,20 +1824,6 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex);
} }
void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
syncNodeBecomeLeader(pSyncNode, "candidate to leader");
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode);
if (pSyncNode->replicaNum > 1) {
syncNodeReplicate(pSyncNode);
}
}
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); } bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) { int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
...@@ -1960,7 +1869,8 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { ...@@ -1960,7 +1869,8 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
// need assert // need assert
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
ASSERT(term == pSyncNode->raftStore.currentTerm); ASSERT(term == pSyncNode->raftStore.currentTerm);
ASSERT(!raftStoreHasVoted(pSyncNode)); bool voted = raftStoreHasVoted(pSyncNode);
ASSERT(!voted);
raftStoreVote(pSyncNode, pRaftId); raftStoreVote(pSyncNode, pRaftId);
} }
...@@ -2638,24 +2548,6 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2638,24 +2548,6 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
return 0; return 0;
} }
int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT(false && "deprecated");
SyncLocalCmd* pMsg = pRpcMsg->pCont;
syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
syncNodeFollowerCommit(ths, pMsg->commitIndex);
} else {
sError("error local cmd");
}
return 0;
}
// TLA+ Spec // TLA+ Spec
// ClientRequest(i, v) == // ClientRequest(i, v) ==
// /\ state[i] = Leader // /\ state[i] = Leader
...@@ -2700,96 +2592,6 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn ...@@ -2700,96 +2592,6 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
} }
} }
int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
sNTrace(ths, "on client request");
int32_t ret = 0;
int32_t code = 0;
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->raftStore.currentTerm;
SSyncRaftEntry* pEntry;
if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index);
} else {
pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
}
LRUHandle* h = NULL;
if (ths->state == TAOS_SYNC_STATE_LEADER) {
// append entry
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
if (code != 0) {
if (ths->replicaNum == 1) {
if (h) {
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
return -1;
} else {
// del resp mgr, call FpCommitCb
SFsmCbMeta cbMeta = {
.index = pEntry->index,
.lastConfigIndex = SYNC_INDEX_INVALID,
.isWeak = pEntry->isWeak,
.code = -1,
.state = ths->state,
.seqNum = pEntry->seqNum,
.term = pEntry->term,
.currentTerm = ths->raftStore.currentTerm,
.flag = 0,
};
ths->pFsm->FpCommitCb(ths->pFsm, pMsg, &cbMeta);
if (h) {
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
return -1;
}
}
syncCacheEntry(ths->pLogStore, pEntry, &h);
// if mulit replica, start replicate right now
if (ths->replicaNum > 1) {
syncNodeReplicate(ths);
}
// if only myself, maybe commit right now
if (ths->replicaNum == 1) {
if (syncNodeIsMnode(ths)) {
syncMaybeAdvanceCommitIndex(ths);
} else {
syncOneReplicaAdvance(ths);
}
}
}
if (pRetIndex != NULL) {
if (ret == 0 && pEntry != NULL) {
*pRetIndex = pEntry->index;
} else {
*pRetIndex = SYNC_INDEX_INVALID;
}
}
if (h) {
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
return ret;
}
const char* syncStr(ESyncState state) { const char* syncStr(ESyncState state) {
switch (state) { switch (state) {
case TAOS_SYNC_STATE_FOLLOWER: case TAOS_SYNC_STATE_FOLLOWER:
...@@ -2894,129 +2696,6 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) { ...@@ -2894,129 +2696,6 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1); return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
} }
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
ASSERT(false);
if (beginIndex > endIndex) {
return 0;
}
if (ths == NULL) {
return -1;
}
if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
// advance commit index to sanpshot first
SSnapshot snapshot = {0};
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
sNTrace(ths, "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, snapshot.lastApplyIndex);
// update begin index
beginIndex = snapshot.lastApplyIndex + 1;
}
}
int32_t code = 0;
ESyncState state = flag;
sNTrace(ths, "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
// execute fsm
if (ths->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry;
SLRUCache* pCache = ths->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &i, sizeof(i));
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
ths->pLogStore->cacheHit++;
sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);
} else {
ths->pLogStore->cacheMiss++;
sNTrace(ths, "miss cache index:%" PRId64, i);
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
// ASSERT(code == 0);
// ASSERT(pEntry != NULL);
if (code != 0 || pEntry == NULL) {
sNError(ths, "get log entry error");
sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
continue;
}
}
SRpcMsg rpcMsg = {0};
syncEntry2OriginalRpc(pEntry, &rpcMsg);
sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType));
// user commit
if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
bool internalExecute = true;
if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
internalExecute = false;
}
sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute,
TMSG_INFO(pEntry->msgType));
// execute fsm in apply thread, or execute outside syncPropose
if (internalExecute) {
SFsmCbMeta cbMeta = {
.index = pEntry->index,
.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
.isWeak = pEntry->isWeak,
.code = 0,
.state = ths->state,
.seqNum = pEntry->seqNum,
.term = pEntry->term,
.currentTerm = ths->raftStore.currentTerm,
.flag = flag,
};
syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
}
}
#if 0
// execute in pre-commit
// leader transfer
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
ASSERT(code == 0);
}
#endif
// restore finish
// if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinishCb != NULL) {
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
sNTrace(ths, "restore finish, index:%" PRId64 ", elapsed:%" PRId64 " ms", pEntry->index, restoreDelay);
}
}
rpcFreeCont(rpcMsg.pCont);
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
}
}
}
return 0;
}
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) { bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
for (int32_t i = 0; i < ths->replicaNum; ++i) { for (int32_t i = 0; i < ths->replicaNum; ++i) {
if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) { if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) {
......
...@@ -945,8 +945,11 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { ...@@ -945,8 +945,11 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) {
for (int i = 0; i < TSDB_MAX_REPLICA; i++) { for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
ASSERT(pNode->logReplMgrs[i] == NULL); ASSERT(pNode->logReplMgrs[i] == NULL);
pNode->logReplMgrs[i] = syncLogReplMgrCreate(); pNode->logReplMgrs[i] = syncLogReplMgrCreate();
if (pNode->logReplMgrs[i] == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pNode->logReplMgrs[i]->peerId = i; pNode->logReplMgrs[i]->peerId = i;
ASSERTS(pNode->logReplMgrs[i] != NULL, "Out of memory.");
} }
return 0; return 0;
} }
......
...@@ -325,6 +325,35 @@ bool walLogEntriesComplete(const SWal* pWal) { ...@@ -325,6 +325,35 @@ bool walLogEntriesComplete(const SWal* pWal) {
return complete; return complete;
} }
int walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
ASSERT(pFileInfo != NULL);
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL);
int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1);
int64_t lastEndOffset = records * sizeof(SWalIdxEntry);
if (fileSize <= lastEndOffset) {
return 0;
}
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize,
lastEndOffset);
taosFtruncateFile(pFile, lastEndOffset);
taosCloseFile(&pFile);
return 0;
}
int walCheckAndRepairMeta(SWal* pWal) { int walCheckAndRepairMeta(SWal* pWal) {
// load log files, get first/snapshot/last version info // load log files, get first/snapshot/last version info
const char* logPattern = "^[0-9]+.log$"; const char* logPattern = "^[0-9]+.log$";
...@@ -402,6 +431,8 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -402,6 +431,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
} }
updateMeta = true; updateMeta = true;
(void)walTrimIdxFile(pWal, fileIdx);
int64_t lastVer = walScanLogGetLastVer(pWal, fileIdx); int64_t lastVer = walScanLogGetLastVer(pWal, fileIdx);
if (lastVer < 0) { if (lastVer < 0) {
if (terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { if (terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
...@@ -567,6 +598,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { ...@@ -567,6 +598,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
goto _err; goto _err;
} }
int64_t count = 0;
while (idxEntry.ver < pFileInfo->lastVer) { while (idxEntry.ver < pFileInfo->lastVer) {
ASSERT(idxEntry.ver == ckHead.head.version); ASSERT(idxEntry.ver == ckHead.head.version);
...@@ -578,11 +610,11 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { ...@@ -578,11 +610,11 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
idxEntry.offset, fLogNameStr); idxEntry.offset, fLogNameStr);
goto _err; goto _err;
} }
wWarn("vgId:%d, wal idx append new entry %" PRId64 " %" PRId64, pWal->cfg.vgId, idxEntry.ver, idxEntry.offset);
if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) { if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr); wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
goto _err; goto _err;
} }
count++;
} }
if (taosFsyncFile(pIdxFile) < 0) { if (taosFsyncFile(pIdxFile) < 0) {
...@@ -590,6 +622,11 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { ...@@ -590,6 +622,11 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
goto _err; goto _err;
} }
if (count > 0) {
wInfo("vgId:%d, rebuilt %" PRId64 " wal idx entries until lastVer: %" PRId64, pWal->cfg.vgId, count,
pFileInfo->lastVer);
}
(void)taosCloseFile(&pLogFile); (void)taosCloseFile(&pLogFile);
(void)taosCloseFile(&pIdxFile); (void)taosCloseFile(&pIdxFile);
return 0; return 0;
......
...@@ -77,6 +77,31 @@ void walUnrefVer(SWalRef *pRef) { ...@@ -77,6 +77,31 @@ void walUnrefVer(SWalRef *pRef) {
} }
#endif #endif
SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
if (pRef == NULL) {
pRef = walOpenRef(pWal);
if (pRef == NULL) {
return NULL;
}
}
taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetFirstVer(pWal);
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
pRef->refVer = ver;
// bsearch in fileSet
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
return pRef;
}
SWalRef *walRefCommittedVer(SWal *pWal) { SWalRef *walRefCommittedVer(SWal *pWal) {
SWalRef *pRef = walOpenRef(pWal); SWalRef *pRef = walOpenRef(pWal);
if (pRef == NULL) { if (pRef == NULL) {
...@@ -86,6 +111,8 @@ SWalRef *walRefCommittedVer(SWal *pWal) { ...@@ -86,6 +111,8 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
int64_t ver = walGetCommittedVer(pWal); int64_t ver = walGetCommittedVer(pWal);
wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
pRef->refVer = ver; pRef->refVer = ver;
// bsearch in fileSet // bsearch in fileSet
SWalFileInfo tmpInfo; SWalFileInfo tmpInfo;
......
...@@ -32,7 +32,18 @@ void swapStr(char* j, char* J, int width) { ...@@ -32,7 +32,18 @@ void swapStr(char* j, char* J, int width) {
} }
#endif #endif
int qsortHelper(const void* p1, const void* p2, const void* param) {
__compar_fn_t comparFn = param;
return comparFn(p1, p2);
}
// todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually. // todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually.
void taosSort(void* arr, int64_t sz, int64_t width, __compar_fn_t compar) { void taosSort(void* base, int64_t sz, int64_t width, __compar_fn_t compar) {
qsort(arr, sz, width, compar); #ifdef _ALPINE
void* param = compar;
taosqsort(base, width, sz, param, qsortHelper);
#else
qsort(base, sz, width, compar);
#endif
} }
此差异已折叠。
此差异已折叠。
此差异已折叠。
...@@ -55,7 +55,7 @@ fi ...@@ -55,7 +55,7 @@ fi
date date
docker run \ docker run \
-v $REP_MOUNT_PARAM \ -v $REP_MOUNT_PARAM \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true;make -j || exit 1" --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true;make -j || exit 1"
if [[ -d ${WORKDIR}/debugNoSan ]] ;then if [[ -d ${WORKDIR}/debugNoSan ]] ;then
echo "delete ${WORKDIR}/debugNoSan" echo "delete ${WORKDIR}/debugNoSan"
...@@ -70,7 +70,7 @@ mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugNoSan ...@@ -70,7 +70,7 @@ mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugNoSan
date date
docker run \ docker run \
-v $REP_MOUNT_PARAM \ -v $REP_MOUNT_PARAM \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug;make -j || exit 1 " --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true;make -j || exit 1 "
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册