diff --git a/docs/en/07-develop/03-insert-data/20-kafka-writting.mdx b/docs/en/07-develop/03-insert-data/20-kafka-writting.mdx new file mode 100644 index 0000000000000000000000000000000000000000..ffb969a8a6d0bc61e23fee44f245a24236db5b1b --- /dev/null +++ b/docs/en/07-develop/03-insert-data/20-kafka-writting.mdx @@ -0,0 +1,46 @@ +--- +title: Write from Kafka +--- + +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; +import PyKafka from "./_py_kafka.mdx"; + +## About Kafka + +Apache Kafka is an open-source distributed event streaming platform, used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. For the key concepts of kafka, please refer to [kafka documentation](https://kafka.apache.org/documentation/#gettingStarted). + +### kafka topic + +Messages in Kafka are organized by topics. A topic may have one or more partitions. We can manage kafka topics through `kafka-topics`. + +create a topic named `kafka-events`: + +``` +bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092 +``` + +Alter `kafka-events` topic to set partitions to 3: + +``` +bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092 +``` + +Show all topics and partitions in Kafka: + +``` +bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe +``` + +## Insert into TDengine + +We can write data into TDengine via SQL or Schemaless. For more information, please refer to [Insert Using SQL](/develop/insert-data/sql-writing/) or [High Performance Writing](/develop/insert-data/high-volume/) or [Schemaless Writing](/reference/schemaless/). + +## Examples + + + + + + + diff --git a/docs/en/07-develop/03-insert-data/02-influxdb-line.mdx b/docs/en/07-develop/03-insert-data/30-influxdb-line.mdx similarity index 100% rename from docs/en/07-develop/03-insert-data/02-influxdb-line.mdx rename to docs/en/07-develop/03-insert-data/30-influxdb-line.mdx diff --git a/docs/en/07-develop/03-insert-data/03-opentsdb-telnet.mdx b/docs/en/07-develop/03-insert-data/40-opentsdb-telnet.mdx similarity index 100% rename from docs/en/07-develop/03-insert-data/03-opentsdb-telnet.mdx rename to docs/en/07-develop/03-insert-data/40-opentsdb-telnet.mdx diff --git a/docs/en/07-develop/03-insert-data/04-opentsdb-json.mdx b/docs/en/07-develop/03-insert-data/50-opentsdb-json.mdx similarity index 100% rename from docs/en/07-develop/03-insert-data/04-opentsdb-json.mdx rename to docs/en/07-develop/03-insert-data/50-opentsdb-json.mdx diff --git a/docs/en/07-develop/03-insert-data/05-high-volume.md b/docs/en/07-develop/03-insert-data/60-high-volume.md similarity index 100% rename from docs/en/07-develop/03-insert-data/05-high-volume.md rename to docs/en/07-develop/03-insert-data/60-high-volume.md diff --git a/docs/en/07-develop/03-insert-data/_py_kafka.mdx b/docs/en/07-develop/03-insert-data/_py_kafka.mdx new file mode 100644 index 0000000000000000000000000000000000000000..dc43a0d415275189220287278547b015865e8d90 --- /dev/null +++ b/docs/en/07-develop/03-insert-data/_py_kafka.mdx @@ -0,0 +1,60 @@ +### python Kafka 客户端 + +For python kafka client, please refer to [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python). In this document, we use [kafka-python](http://github.com/dpkp/kafka-python). + +### consume from Kafka + +The simple way to consume messages from Kafka is to read messages one by one. The demo is as follows: + +``` +from kafka import KafkaConsumer +consumer = KafkaConsumer('my_favorite_topic') +for msg in consumer: + print (msg) +``` + +For higher performance, we can consume message from kafka in batch. The demo is as follows: + +``` +from kafka import KafkaConsumer +consumer = KafkaConsumer('my_favorite_topic') +while True: + msgs = consumer.poll(timeout_ms=500, max_records=1000) + if msgs: + print (msgs) +``` + +### multi-threading + +For more higher performance we can process data from kafka in multi-thread. We can use python's ThreadPoolExecutor to achieve multithreading. The demo is as follows: + +``` +from concurrent.futures import ThreadPoolExecutor, Future +pool = ThreadPoolExecutor(max_workers=10) +pool.submit(...) +``` + +### multi-process + +For more higher performance, sometimes we use multiprocessing. In this case, the number of Kafka Consumers should not be greater than the number of Kafka Topic Partitions. The demo is as follows: + +``` +from multiprocessing import Process + +ps = [] +for i in range(5): + p = Process(target=Consumer().consume()) + p.start() + ps.append(p) + +for p in ps: + p.join() +``` + +In addition to python's built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn. + +### Examples + +```py +{{#include docs/examples/python/kafka_example.py}} +``` diff --git a/docs/en/20-third-party/01-grafana.mdx b/docs/en/20-third-party/01-grafana.mdx index e0fbefd5a8634d2001f2cc0601afa110aff33632..ca32ce8afce30eaa6756f7b403685b989d824bbf 100644 --- a/docs/en/20-third-party/01-grafana.mdx +++ b/docs/en/20-third-party/01-grafana.mdx @@ -76,7 +76,7 @@ sudo -u grafana grafana-cli plugins install tdengine-datasource You can also download zip files from [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and install manually. The commands are as follows: ```bash -GF_VERSION=3.2.2 +GF_VERSION=3.2.7 # from GitHub wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip # from Grafana diff --git a/docs/en/25-application/01-telegraf.md b/docs/en/25-application/01-telegraf.md index f7003264496e61f33e843a4c8f2ec8227ba571b6..65fb08ee674cc9c952bbcfda57f0366fa129fdf2 100644 --- a/docs/en/25-application/01-telegraf.md +++ b/docs/en/25-application/01-telegraf.md @@ -38,15 +38,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/ ## Data Connection Setup -### Download TDengine plug-in to grafana plug-in directory +### Install Grafana Plugin and Configure Data Source -```bash -1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip -2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/ -3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine -4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini -5. sudo systemctl restart grafana-server.service -``` +Please refer to [Install Grafana Plugin and Configure Data Source](/third-party/grafana/#install-grafana-plugin-and-configure-data-source) ### Modify /etc/telegraf/telegraf.conf diff --git a/docs/en/25-application/02-collectd.md b/docs/en/25-application/02-collectd.md index 692cd8d929f04a03e4433bd0b71f84101bc362cb..97412b230941435a08df12c859d63bca68b79ec9 100644 --- a/docs/en/25-application/02-collectd.md +++ b/docs/en/25-application/02-collectd.md @@ -41,15 +41,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/ ## Data Connection Setup -### Copy the TDengine plugin to the grafana plugin directory - -```bash -1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip -2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/ -3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine -4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini -5. sudo systemctl restart grafana-server.service -``` +### Install Grafana Plugin and Configure Data Source + +Please refer to [Install Grafana Plugin and Configure Data Source](/third-party/grafana/#install-grafana-plugin-and-configure-data-source) ### Configure collectd diff --git a/docs/examples/python/kafka_example.py b/docs/examples/python/kafka_example.py new file mode 100644 index 0000000000000000000000000000000000000000..735059eec0f3dcf5094810916e66a39db5682560 --- /dev/null +++ b/docs/examples/python/kafka_example.py @@ -0,0 +1,192 @@ +#! encoding = utf-8 +import json +import time +from json import JSONDecodeError +from typing import Callable +import logging +from concurrent.futures import ThreadPoolExecutor, Future + +import taos +from kafka import KafkaConsumer +from kafka.consumer.fetcher import ConsumerRecord + + +class Consumer(object): + DEFAULT_CONFIGS = { + 'kafka_brokers': 'localhost:9092', + 'kafka_topic': 'python_kafka', + 'kafka_group_id': 'taos', + 'taos_host': 'localhost', + 'taos_user': 'root', + 'taos_password': 'taosdata', + 'taos_database': 'power', + 'taos_port': 6030, + 'timezone': None, + 'clean_after_testing': False, + 'bath_consume': True, + 'batch_size': 1000, + 'async_model': True, + 'workers': 10 + } + + LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose', + 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale', + 'California.SantaClara', 'California.Cupertino'] + + CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1' + USE_DATABASE_SQL = 'use {}' + DROP_TABLE_SQL = 'drop table if exists meters' + DROP_DATABASE_SQL = 'drop database if exists {}' + CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) ' \ + 'tags (location binary(64), groupId int)' + CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})' + INSERT_SQL_HEADER = "insert into " + INSERT_PART_SQL = 'power.{} values (\'{}\', {}, {}, {})' + + def __init__(self, **configs): + self.config: dict = self.DEFAULT_CONFIGS + self.config.update(configs) + self.consumer = KafkaConsumer( + self.config.get('kafka_topic'), # topic + bootstrap_servers=self.config.get('kafka_brokers'), + group_id=self.config.get('kafka_group_id'), + ) + self.taos = taos.connect( + host=self.config.get('taos_host'), + user=self.config.get('taos_user'), + password=self.config.get('taos_password'), + port=self.config.get('taos_port'), + timezone=self.config.get('timezone'), + ) + if self.config.get('async_model'): + self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers')) + self.tasks: list[Future] = [] + # tags and table mapping # key: {location}_{groupId} value: + self.tag_table_mapping = {} + i = 0 + for location in self.LOCATIONS: + for j in range(1, 11): + table_name = 'd{}'.format(i) + self._cache_table(location=location, group_id=j, table_name=table_name) + i += 1 + + def init_env(self): + # create database and table + self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database'))) + self.taos.execute(self.CREATE_DATABASE_SQL.format(self.config.get('taos_database'))) + self.taos.execute(self.USE_DATABASE_SQL.format(self.config.get('taos_database'))) + self.taos.execute(self.DROP_TABLE_SQL) + self.taos.execute(self.CREATE_STABLE_SQL) + for tags, table_name in self.tag_table_mapping.items(): + location, group_id = _get_location_and_group(tags) + self.taos.execute(self.CREATE_TABLE_SQL.format(table_name, location, group_id)) + + def consume(self): + logging.warning('## start consumer topic-[%s]', self.config.get('kafka_topic')) + try: + if self.config.get('bath_consume'): + self._run_batch(self._to_taos_batch) + else: + self._run(self._to_taos) + except KeyboardInterrupt: + logging.warning("## caught keyboard interrupt, stopping") + finally: + self.stop() + + def stop(self): + # close consumer + if self.consumer is not None: + self.consumer.commit() + self.consumer.close() + + # multi thread + if self.config.get('async_model'): + for task in self.tasks: + while not task.done(): + pass + if self.pool is not None: + self.pool.shutdown() + + # clean data + if self.config.get('clean_after_testing'): + self.taos.execute(self.DROP_TABLE_SQL) + self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database'))) + # close taos + if self.taos is not None: + self.taos.close() + + def _run(self, f: Callable[[ConsumerRecord], bool]): + for message in self.consumer: + if self.config.get('async_model'): + self.pool.submit(f(message)) + else: + f(message) + + def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]): + while True: + messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size')) + if messages: + if self.config.get('async_model'): + self.pool.submit(f, messages.values()) + else: + f(list(messages.values())) + if not messages: + time.sleep(0.1) + + def _to_taos(self, message: ConsumerRecord) -> bool: + sql = self.INSERT_SQL_HEADER + self._build_sql(message.value) + if len(sql) == 0: # decode error, skip + return True + logging.info('## insert sql %s', sql) + return self.taos.execute(sql=sql) == 1 + + def _to_taos_batch(self, messages: list[list[ConsumerRecord]]): + sql = self._build_sql_batch(messages=messages) + if len(sql) == 0: # decode error, skip + return + self.taos.execute(sql=sql) + + def _build_sql(self, msg_value: str) -> str: + try: + data = json.loads(msg_value) + except JSONDecodeError as e: + logging.error('## decode message [%s] error ', msg_value, e) + return '' + location = data.get('location') + group_id = data.get('groupId') + ts = data.get('ts') + current = data.get('current') + voltage = data.get('voltage') + phase = data.get('phase') + + table_name = self._get_table_name(location=location, group_id=group_id) + return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase) + + def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str: + sql_list = [] + for partition_messages in messages: + for message in partition_messages: + sql_list.append(self._build_sql(message.value)) + + return self.INSERT_SQL_HEADER + ' '.join(sql_list) + + def _cache_table(self, location: str, group_id: int, table_name: str): + self.tag_table_mapping[_tag_table_mapping_key(location=location, group_id=group_id)] = table_name + + def _get_table_name(self, location: str, group_id: int) -> str: + return self.tag_table_mapping.get(_tag_table_mapping_key(location=location, group_id=group_id)) + + +def _tag_table_mapping_key(location: str, group_id: int): + return '{}_{}'.format(location, group_id) + + +def _get_location_and_group(key: str) -> (str, int): + fields = key.split('_') + return fields[0], fields[1] + + +if __name__ == '__main__': + consumer = Consumer(async_model=True) + consumer.init_env() + consumer.consume() \ No newline at end of file diff --git a/docs/zh/07-develop/03-insert-data/20-kafka-writting.mdx b/docs/zh/07-develop/03-insert-data/20-kafka-writting.mdx new file mode 100644 index 0000000000000000000000000000000000000000..32d3c2e5cbe5a167e4d7cb459ace8259c407b693 --- /dev/null +++ b/docs/zh/07-develop/03-insert-data/20-kafka-writting.mdx @@ -0,0 +1,47 @@ +--- +title: 从 Kafka 写入 +--- + +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; +import PyKafka from "./_py_kafka.mdx"; + +## Kafka 介绍 + +Apache Kafka 是开源的分布式消息分发平台,被广泛应用于高性能数据管道、流式数据分析、数据集成和事件驱动类型的应用程序。Kafka 包含 Producer、Consumer 和 Topic,其中 Producer 是向 Kafka 发送消息的进程,Consumer 是从 Kafka 消费消息的进程。Kafka 相关概念可以参考[官方文档](https://kafka.apache.org/documentation/#gettingStarted)。 + + +### kafka topic + +Kafka 的消息按 topic 组织,每个 topic 会有一到多个 partition。可以通过 kafka 的 `kafka-topics` 管理 topic。 + +创建名为 `kafka-events` 的topic: + +``` +bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092 +``` + +修改 `kafka-events` 的 partition 数量为 3: + +``` +bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092 +``` + +展示所有的 topic 和 partition: + +``` +bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe +``` + +## 写入 TDengine + +TDengine 支持 Sql 方式和 Schemaless 方式的数据写入,Sql 方式数据写入可以参考 [TDengine SQL 写入](/develop/insert-data/sql-writing/) 和 [TDengine 高效写入](/develop/insert-data/high-volume/)。Schemaless 方式数据写入可以参考 [TDengine Schemaless 写入](/reference/schemaless/) 文档。 + +## 示例代码 + + + + + + + diff --git a/docs/zh/07-develop/03-insert-data/02-influxdb-line.mdx b/docs/zh/07-develop/03-insert-data/30-influxdb-line.mdx similarity index 100% rename from docs/zh/07-develop/03-insert-data/02-influxdb-line.mdx rename to docs/zh/07-develop/03-insert-data/30-influxdb-line.mdx diff --git a/docs/zh/07-develop/03-insert-data/03-opentsdb-telnet.mdx b/docs/zh/07-develop/03-insert-data/40-opentsdb-telnet.mdx similarity index 100% rename from docs/zh/07-develop/03-insert-data/03-opentsdb-telnet.mdx rename to docs/zh/07-develop/03-insert-data/40-opentsdb-telnet.mdx diff --git a/docs/zh/07-develop/03-insert-data/04-opentsdb-json.mdx b/docs/zh/07-develop/03-insert-data/50-opentsdb-json.mdx similarity index 100% rename from docs/zh/07-develop/03-insert-data/04-opentsdb-json.mdx rename to docs/zh/07-develop/03-insert-data/50-opentsdb-json.mdx diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/60-high-volume.md similarity index 100% rename from docs/zh/07-develop/03-insert-data/05-high-volume.md rename to docs/zh/07-develop/03-insert-data/60-high-volume.md diff --git a/docs/zh/07-develop/03-insert-data/_py_kafka.mdx b/docs/zh/07-develop/03-insert-data/_py_kafka.mdx new file mode 100644 index 0000000000000000000000000000000000000000..cd7edf557d4ae41df0f55f4456fe405515132e51 --- /dev/null +++ b/docs/zh/07-develop/03-insert-data/_py_kafka.mdx @@ -0,0 +1,60 @@ +### python Kafka 客户端 + +Kafka 的 python 客户端可以参考文档 [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python)。推荐使用 [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python) 和 [kafka-python](http://github.com/dpkp/kafka-python)。以下示例以 [kafka-python](http://github.com/dpkp/kafka-python) 为例。 + +### 从 Kafka 消费数据 + +Kafka 客户端采用 pull 的方式从 Kafka 消费数据,可以采用单条消费的方式或批量消费的方式读取数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端单条消费数据的示例如下: + +``` +from kafka import KafkaConsumer +consumer = KafkaConsumer('my_favorite_topic') +for msg in consumer: + print (msg) +``` + +单条消费的方式在数据流量大的情况下往往存在性能瓶颈,导致 Kafka 消息积压,更推荐使用批量消费的方式消费数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端批量消费数据的示例如下: + +``` +from kafka import KafkaConsumer +consumer = KafkaConsumer('my_favorite_topic') +while True: + msgs = consumer.poll(timeout_ms=500, max_records=1000) + if msgs: + print (msgs) +``` + +### Python 多线程 + +为了提高数据写入效率,通常采用多线程的方式写入数据,可以使用 python 线程池 ThreadPoolExecutor 实现多线程。示例代码如下: + +``` +from concurrent.futures import ThreadPoolExecutor, Future +pool = ThreadPoolExecutor(max_workers=10) +pool.submit(...) +``` + +### Python 多进程 + +单个python进程不能充分发挥多核 CPU 的性能,有时候我们会选择多进程的方式。在多进程的情况下,需要注意,Kafka Consumer 的数量应该小于等于 Kafka Topic Partition 数量。Python 多进程示例代码如下: + +``` +from multiprocessing import Process + +ps = [] +for i in range(5): + p = Process(target=Consumer().consume()) + p.start() + ps.append(p) + +for p in ps: + p.join() +``` + +除了 Python 内置的多线程和多进程方式,还可以通过第三方库 gunicorn 实现并发。 + +### 完整示例 + +```py +{{#include docs/examples/python/kafka_example.py}} +``` diff --git a/docs/zh/25-application/01-telegraf.md b/docs/zh/25-application/01-telegraf.md index 6338264d171b9246b2e99d418035e061d1068a4b..ec263d77927d2960ef684d9d94c9cad4073429b9 100644 --- a/docs/zh/25-application/01-telegraf.md +++ b/docs/zh/25-application/01-telegraf.md @@ -39,15 +39,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如 ## 数据链路设置 -### 下载 TDengine 插件到 Grafana 插件目录 +### 安装 Grafana Plugin 并配置数据源 -```bash -1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip -2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/ -3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine -4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini -5. sudo systemctl restart grafana-server.service -``` +请参考[安装 Grafana Plugin 并配置数据源](/third-party/grafana/#%E5%AE%89%E8%A3%85-grafana-plugin-%E5%B9%B6%E9%85%8D%E7%BD%AE%E6%95%B0%E6%8D%AE%E6%BA%90)。 ### 修改 /etc/telegraf/telegraf.conf diff --git a/docs/zh/25-application/02-collectd.md b/docs/zh/25-application/02-collectd.md index c6230f48abb545e3064f406d9005a4a3ba8ea5ba..8b39a6431ddeebae2179f4dc535cabe841a499a1 100644 --- a/docs/zh/25-application/02-collectd.md +++ b/docs/zh/25-application/02-collectd.md @@ -41,15 +41,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如 ## 数据链路设置 -### 复制 TDengine 插件到 grafana 插件目录 - -```bash -1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip -2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/ -3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine -4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini -5. sudo systemctl restart grafana-server.service -``` +### 安装 Grafana Plugin 并配置数据源 + +请参考[安装 Grafana Plugin 并配置数据源](/third-party/grafana/#%E5%AE%89%E8%A3%85-grafana-plugin-%E5%B9%B6%E9%85%8D%E7%BD%AE%E6%95%B0%E6%8D%AE%E6%BA%90)。 ### 配置 collectd diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index e308666524ed114a7e9dfff9eb3bd25b9bcf52e2..5eeb76ec7941c3b3efedc3edc9ffb33947502a9e 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -89,12 +89,12 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tRowDestroy(SRow *pRow); void tRowSort(SArray *aRowP); int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); +int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData); // SRowIter ================================ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); void tRowIterClose(SRowIter **ppIter); SColVal *tRowIterNext(SRowIter *pIter); -int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData); // STag ================================ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d210004760b28084afb0c4bd5b34157fb18e001d..412b4b4cf6c950113a2f7e6d319692695cde0c07 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); * @param tinfo qhandle * @return */ -int32_t qAsyncKillTask(qTaskInfo_t tinfo); +int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); /** * destroy query info structure diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index f51aa88485a9618365224744464befb3461e704a..63192812122025b10b85c59af343216df139d1c0 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -260,15 +260,16 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) -#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) +#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED) #define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) #define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later -#define NEED_REDIRECT_ERROR(_code) \ - ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ +#define NEED_REDIRECT_ERROR(_code) \ + ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \ - (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK) + (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK || \ + (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING) #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 25d37020cc65518e4e15e5b0b354ee691d5970ee..a6155f6611f6aa0acb57a56625f3d38c98bb9bc2 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -40,21 +40,37 @@ int32_t* taosGetErrno(); #define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error // rpc +// #define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //2.x +// #define TSDB_CODE_RPC_AUTH_REQUIRED TAOS_DEF_ERROR_CODE(0, 0x0002) //2.x #define TSDB_CODE_RPC_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0003) #define TSDB_CODE_RPC_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0004) +// #define TSDB_CODE_RPC_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0005) //2.x +// #define TSDB_CODE_RPC_ALREADY_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0006) //2.x +// #define TSDB_CODE_RPC_LAST_SESSION_NOT_FINI. TAOS_DEF_ERROR_CODE(0, 0x0007) //2.x +// #define TSDB_CODE_RPC_MISMATCHED_LINK_ID TAOS_DEF_ERROR_CODE(0, 0x0008) //2.x +// #define TSDB_CODE_RPC_TOO_SLOW TAOS_DEF_ERROR_CODE(0, 0x0009) //2.x +// #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x000A) //2.x #define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x000B) +// #define TSDB_CODE_RPC_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x000C) //2.x +// #define TSDB_CODE_RPC_UNEXPECTED_RESPONSE TAOS_DEF_ERROR_CODE(0, 0x000D) //2.x +// #define TSDB_CODE_RPC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x000E) //2.x +// #define TSDB_CODE_RPC_INVALID_TRAN_ID TAOS_DEF_ERROR_CODE(0, 0x000F) //2.x +// #define TSDB_CODE_RPC_INVALID_SESSION_ID TAOS_DEF_ERROR_CODE(0, 0x0010) //2.x +// #define TSDB_CODE_RPC_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0011) //2.x +// #define TSDB_CODE_RPC_INVALID_RESPONSE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0012) //2.x +#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013) +#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014) #define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) +// #define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) //2.x #define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) #define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018) #define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) //common & util -#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013) -#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014) - #define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) #define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0101) #define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0102) +// #define TSDB_CODE_COM_INVALID_CFG_MSG TAOS_DEF_ERROR_CODE(0, 0x0103) // 2.x #define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0105) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0106) @@ -69,7 +85,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0113) #define TSDB_CODE_INVALID_SHM_ID TAOS_DEF_ERROR_CODE(0, 0x0114) #define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0115) -#define TSDB_CODE_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0116) +#define TSDB_CODE_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0116) // #define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0117) #define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x0118) #define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x0119) @@ -81,7 +97,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x011F) #define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0120) -#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121) +#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121) // #define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0122) #define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0123) #define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0124) @@ -94,6 +110,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B) #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) +#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) // +#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) // + //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) #define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) @@ -324,6 +343,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526) #define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527) #define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528) +#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index afa729b40ba41b3c2f69348a1b9ae2da6411adda..b9887f28d4896d1724e9d6529fd8297c83c1f9be 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -764,7 +764,7 @@ int stmtAddBatch(TAOS_STMT* stmt) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH)); - //STMT_ERR_RET(stmtCacheBlock(pStmt)); + STMT_ERR_RET(stmtCacheBlock(pStmt)); return TSDB_CODE_SUCCESS; } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index eba30ff506c9ef2d55f4917380717c2ea48084f8..0bb283708dbeba41c33fb3fda749394bad42dfaa 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2061,8 +2061,8 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall } // value - if (pColData->nVal) { - pColData->pData = xMalloc(arg, pColData->nVal); + if (pColData->nData) { + pColData->pData = xMalloc(arg, pColData->nData); if (pColData->pData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 07a612bb351e1fe6ed1dc93cb993ebdea42310dc..30ef7b9542be6a3365dcd49284d96f4edbb26016 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -144,6 +144,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; + dGError("msg:%p, not processed in mgmt queue", pMsg); break; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 95656fd76c510863c20852c694b0a6f09386c72f..d037cc33752fd1255e1e6dba967e9214a3cea664 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -51,13 +51,15 @@ static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { } int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { + const STraceId *trace = &pMsg->info.traceId; + NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; if (msgFp == NULL) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; + dGError("msg:%p, not processed since no handler", pMsg); return -1; } - const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name); pMsg->info.wrapper = pWrapper; return (*msgFp)(pWrapper->pMgmt, pMsg); @@ -99,18 +101,23 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dmProcessServerStartupStatus(pDnode, pRpc); return; } else { - terrno = TSDB_CODE_APP_NOT_READY; + if (pDnode->status == DND_STAT_INIT) { + terrno = TSDB_CODE_APP_IS_STARTING; + } else { + terrno = TSDB_CODE_APP_IS_STOPPING; + } goto _OVER; } } - if (IsReq(pRpc) && pRpc->pCont == NULL) { + if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) { dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; goto _OVER; } if (pHandle->defaultNtype == NODE_END) { + dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType)); terrno = TSDB_CODE_MSG_NOT_PROCESSED; goto _OVER; } @@ -234,7 +241,8 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || - code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK || + code == TSDB_CODE_VND_STOPPED) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 9f4c2e048f09b530fa2fac839fd12c192d3a8d5c..d46ae7aaa985ddc1a51c12eea72bb411477bb7f5 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -644,17 +644,6 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { return -1; } -static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { - if (!IsReq(pMsg)) return 0; - if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; - - const STraceId *trace = &pMsg->info.traceId; - mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen, - pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); - terrno = TSDB_CODE_INVALID_MSG_LEN; - return -1; -} - int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; @@ -666,7 +655,6 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { return -1; } - if (mndCheckMsgContent(pMsg) != 0) return -1; if (mndCheckMnodeState(pMsg) != 0) return -1; mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4f96ba2042f99d3f0e467e2986c8c670887b598c..99adb6bf6a8faf45203b52e5f8034559e69903a8 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -117,7 +117,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) { void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); int32_t tsdbRowCmprFn(const void *p1, const void *p2); // STSDBRowIter -void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); +int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); +void tsdbRowClose(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter); // SRowMerger int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); @@ -571,10 +572,14 @@ struct SDFileSet { }; struct STSDBRowIter { - TSDBROW *pRow; - STSchema *pTSchema; - SColVal colVal; - int32_t i; + TSDBROW *pRow; + union { + SRowIter *pIter; + struct { + int32_t iColData; + SColVal cv; + }; + }; }; struct SRowMerger { STSchema *pTSchema; diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index b46a00363817666cdff7bc7756ff204483787c36..ae9af11f5ae8226c1fceefb57b4d83167800772a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -596,7 +596,7 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts; STSDBRowIter iter = {0}; - tsdbRowIterInit(&iter, pRow, pTSchema); + tsdbRowIterOpen(&iter, pRow, pTSchema); SColVal *pColVal = tsdbRowIterNext(&iter); for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 472211b53675996566a399f912ddeaef043923c8..3a08093d47f39ef915149b82de890625189e7874 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -194,7 +194,7 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange); -static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, +static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow); static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); @@ -3358,7 +3358,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc return TSDB_CODE_SUCCESS; } -int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, +int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; TSDBROW current = *pRow; @@ -3367,25 +3367,27 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, pIter->hasVal = tsdbTbDataIterNext(pIter->iter); if (!pIter->hasVal) { - *pTSRow = current.pTSRow; + *pResRow = *pRow; *freeTSRow = false; return TSDB_CODE_SUCCESS; } else { // has next point in mem/imem pNextRow = getValidMemRow(pIter, pDelList, pReader); if (pNextRow == NULL) { - *pTSRow = current.pTSRow; + *pResRow = current; *freeTSRow = false; return TSDB_CODE_SUCCESS; } if (TSDBROW_TS(¤t) != TSDBROW_TS(pNextRow)) { - *pTSRow = current.pTSRow; + *pResRow = current; *freeTSRow = false; return TSDB_CODE_SUCCESS; } } } +#if 0 + // todo handle the data block merge SRowMerger merge = {0}; // get the correct schema for data in memory @@ -3423,6 +3425,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, tsdbRowMergerClear(&merge); *freeTSRow = true; +#endif + return TSDB_CODE_SUCCESS; } @@ -3480,7 +3484,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } -int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow, int64_t endKey, +int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey, bool* freeTSRow) { TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); @@ -3510,13 +3514,14 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR int32_t code = TSDB_CODE_SUCCESS; if (ik.ts != k.ts) { if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts - code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow); } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { - code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow); } } else { // ik.ts == k.ts *freeTSRow = true; - code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); + pResRow->type = TSDBROW_ROW_FMT; + code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3526,12 +3531,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR } if (pBlockScanInfo->iter.hasVal && pRow != NULL) { - return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, + return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow); } if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { - return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow); } return TSDB_CODE_SUCCESS; @@ -3636,17 +3641,22 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e SSDataBlock* pBlock = pReader->pResBlock; do { - SRow* pTSRow = NULL; +// SRow* pTSRow = NULL; + TSDBROW row = {.type = -1}; bool freeTSRow = false; - tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow); - if (pTSRow == NULL) { + tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); + if (row.type == -1) { break; } - doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo); + if (row.type == TSDBROW_ROW_FMT) { + doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); - if (freeTSRow) { - taosMemoryFree(pTSRow); + if (freeTSRow) { + taosMemoryFree(row.pTSRow); + } + } else { + doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); } // no data in buffer, return immediately diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 368ebec3f901723cb41c2f8649f90cc9830a10ef..36ee85df6b89e2a4e989d8b32200f81ae7381d64 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -594,42 +594,50 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2) { } // STSDBRowIter ====================================================== -void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { +int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + pIter->pRow = pRow; if (pRow->type == TSDBROW_ROW_FMT) { - ASSERT(pTSchema); - pIter->pTSchema = pTSchema; - pIter->i = 1; + code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter); + if (code) goto _exit; } else if (pRow->type == TSDBROW_COL_FMT) { - pIter->pTSchema = NULL; - pIter->i = 0; + pIter->iColData = 0; } else { ASSERT(0); } + +_exit: + return code; } -SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { +void tsdbRowClose(STSDBRowIter *pIter) { if (pIter->pRow->type == TSDBROW_ROW_FMT) { - if (pIter->i < pIter->pTSchema->numOfCols) { - tRowGet(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal); - pIter->i++; + tRowIterClose(&pIter->pIter); + } +} - return &pIter->colVal; - } +SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { + if (pIter->pRow->type == TSDBROW_ROW_FMT) { + return tRowIterNext(pIter->pIter); } else if (pIter->pRow->type == TSDBROW_COL_FMT) { - if (pIter->i < pIter->pRow->pBlockData->nColData) { - SColData *pColData = tBlockDataGetColDataByIdx(pIter->pRow->pBlockData, pIter->i); - - tColDataGetValue(pColData, pIter->pRow->iRow, &pIter->colVal); - pIter->i++; + if (pIter->iColData == 0) { + pIter->cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, + (SValue){.val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]}); + ++pIter->iColData; + return &pIter->cv; + } - return &pIter->colVal; + if (pIter->iColData < pIter->pRow->pBlockData->nColData) { + tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData], pIter->pRow->iRow, &pIter->cv); + ++pIter->iColData; + return &pIter->cv; + } else { + return NULL; } } else { ASSERT(0); } - - return NULL; } // SRowMerger ====================================================== @@ -1097,7 +1105,8 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD int32_t ridx = pBlockData->nColData - 1; while (lidx <= ridx) { - int32_t midx = (lidx + ridx) >> 2; + int32_t midx = lidx + (ridx - lidx) * (cid - pBlockData->aColData[lidx].cid) / + (pBlockData->aColData[ridx].cid - pBlockData->aColData[lidx].cid); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, midx); int32_t c = (pColData->cid == cid) ? 0 : ((pColData->cid > cid) ? 1 : -1); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 7100be58e36baf851bc16b03f195e255a7fdc5e0..3dbcd8a07e68a98e0255049c22fb2b7925d46c46 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -781,7 +781,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t checkForQueryBuf(size_t numOfTables); bool isTaskKilled(SExecTaskInfo* pTaskInfo); -void setTaskKilled(SExecTaskInfo* pTaskInfo); +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); void doDestroyTask(SExecTaskInfo* pTaskInfo); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 963a2732908c91ca65d354ba041c058cd050f29d..53660d88e1bd819e0adbd5f3fe556f3eca7b63f7 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + longjmp(pTaskInfo->env, pTaskInfo->code); } for (int32_t i = 0; i < totalSources; ++i) { @@ -573,7 +573,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + longjmp(pTaskInfo->env, pTaskInfo->code); } tsem_post(&pExchangeInfo->ready); @@ -621,7 +621,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + longjmp(pTaskInfo->env, pTaskInfo->code); } SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 10ceb9ccee3cbf4f678d611f8f7f049745763e93..ebd1afa85539eb9f2e660c3707b16a2c97e32240 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { taosWUnLockLatch(&pTaskInfo->stopInfo.lock); } -int32_t qAsyncKillTask(qTaskInfo_t qinfo) { +int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; if (pTaskInfo == NULL) { @@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); - setTaskKilled(pTaskInfo); + setTaskKilled(pTaskInfo, rspCode); qStopTaskOperators(pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d0d8b42442328e0268992a6383bc781ef06950c2..dd527058ceee4c32672fba04fd2983ba13d42ea9 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -610,21 +610,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB } bool isTaskKilled(SExecTaskInfo* pTaskInfo) { - // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived - // abort current query execution. - if (pTaskInfo->owner != 0 && - ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec()) - /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) { - assert(pTaskInfo->cost.start != 0); - // qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64 - // ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); - // return true; - } - - return false; + return (0 != pTaskInfo->code) ? true : false; } -void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; } +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } ///////////////////////////////////////////////////////////////////////////////////////////// STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7f0dec1959031dba37a078f519d5d7f31b62cece..cf0e7b532f1ce8ad0c31bdb2f055dd63a32fc13f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -629,7 +629,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) { if (isTaskKilled(pTaskInfo)) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } // process this data block based on the probabilities @@ -2032,7 +2032,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + longjmp(pTaskInfo->env, pTaskInfo->code); } int32_t rows = 0; @@ -2529,7 +2529,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } // process this data block based on the probabilities diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index f06bafafe3623faa07e1a1ae14fb9462c1c30862..40698322a0a10ebab6704c3451f024ba353da140 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -270,7 +270,7 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p #if __AVX2__ // find the start position that are aligned to 32bytes address in memory - int32_t width = (bitWidth>>3u) / sizeof(int64_t); + int32_t width = (bitWidth >> 3u) / sizeof(int64_t); int32_t remainder = numOfRows % width; int32_t rounds = numOfRows / width; @@ -286,20 +286,11 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p } // let sum up the final results - if (type == TSDB_DATA_TYPE_BIGINT) { - const int64_t* q = (const int64_t*)∑ - pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.isum += plist[j + rounds * width]; - } - } else { - const uint64_t* q = (const uint64_t*)∑ - pRes->sum.usum += q[0] + q[1] + q[2] + q[3]; + const int64_t* q = (const int64_t*)∑ + pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.usum += (uint64_t)plist[j + rounds * width]; - } + for (int32_t j = 0; j < remainder; ++j) { + pRes->sum.isum += plist[j + rounds * width]; } #endif @@ -588,7 +579,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { const int64_t* plist = (const int64_t*) pCol->pData; // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop - if (simdAvailable) { + if (simdAvailable && type == TSDB_DATA_TYPE_BIGINT) { i64VectorSumAVX2(plist, numOfRows, pAvgRes); } else { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 7d9c44495e10e2970b9ad55c73b8446919e46df0..bd74789d9a09720f2b5a17fb42467932a6c26ad5 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -156,31 +156,95 @@ end: return code; } +int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND* src, TAOS_MULTI_BIND* dst) { + int32_t output = 0; + int32_t newBuflen = (pSchema->bytes - VARSTR_HEADER_SIZE) * src->num; + if (dst->buffer_length < newBuflen) { + dst->buffer = taosMemoryRealloc(dst->buffer, newBuflen); + if (NULL == dst->buffer) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (NULL == dst->length) { + dst->length = taosMemoryRealloc(dst->length, sizeof(int32_t) * src->num); + if (NULL == dst->buffer) { + taosMemoryFreeClear(dst->buffer); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + dst->buffer_length = pSchema->bytes - VARSTR_HEADER_SIZE; + + for (int32_t i = 0; i < src->num; ++i) { + if (src->is_null && src->is_null[i]) { + continue; + } + + if (!taosMbsToUcs4(src->buffer + src->buffer_length * i, src->length[i], (TdUcs4*)(dst->buffer + dst->buffer_length * i), dst->buffer_length, &output)) { + if (errno == E2BIG) { + return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); + } + char buf[512] = {0}; + snprintf(buf, tListLen(buf), "%s", strerror(errno)); + return buildSyntaxErrMsg(pMsgBuf, buf, NULL); + } + + dst->length[i] = output; + } + + dst->buffer_type = src->buffer_type; + dst->is_null = src->is_null; + dst->num = src->num; + + return TSDB_CODE_SUCCESS; +} + int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; int32_t rowNum = bind->num; + TAOS_MULTI_BIND ncharBind = {0}; + TAOS_MULTI_BIND* pBind = NULL; + int32_t code = 0; for (int c = 0; c < boundInfo->numOfBound; ++c) { SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c); if (bind[c].num != rowNum) { - return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); + code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); + goto _return; } if (bind[c].buffer_type != pColSchema->type) { - return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); + code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); + goto _return; } - tColDataAddValueByBind(pCol, bind + c); + if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) { + code = convertStmtNcharCol(&pBuf, pColSchema, bind + c, &ncharBind); + if (code) { + goto _return; + } + pBind = &ncharBind; + } else { + pBind = bind + c; + } + + tColDataAddValueByBind(pCol, pBind); } qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum); - return TSDB_CODE_SUCCESS; +_return: + + taosMemoryFree(ncharBind.buffer); + taosMemoryFree(ncharBind.length); + + return code; } int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, @@ -191,6 +255,9 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]]; SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx); + TAOS_MULTI_BIND ncharBind = {0}; + TAOS_MULTI_BIND* pBind = NULL; + int32_t code = 0; if (bind->num != rowNum) { return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); @@ -199,12 +266,27 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu if (bind->buffer_type != pColSchema->type) { return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); } + + if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) { + code = convertStmtNcharCol(&pBuf, pColSchema, bind, &ncharBind); + if (code) { + goto _return; + } + pBind = &ncharBind; + } else { + pBind = bind; + } - tColDataAddValueByBind(pCol, bind); + tColDataAddValueByBind(pCol, pBind); qDebug("stmt col %d bind %d rows data", colIdx, rowNum); - return TSDB_CODE_SUCCESS; +_return: + + taosMemoryFree(ncharBind.buffer); + taosMemoryFree(ncharBind.length); + + return code; } int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d2d97f7b90c0a54b7f47586d13677e4c83e15f4e..15682a2cfe68768526bfce06edd50f0abbe86ce9 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -750,8 +750,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) { return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId); } else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) { SFunctionNode* pFunc = (SFunctionNode*)pExpr; - if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_FIRST == pFunc->funcType || - FUNCTION_TYPE_LAST == pFunc->funcType) { + if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType || + FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) { return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0)); } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) { return true; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index a0e04b6a19bd16cb98415278ea8748406e1cfa89..af361323a7731f13512af85139d3ccdfd955ebf0 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -363,7 +363,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); -int32_t qwKillTaskHandle(SQWTaskCtx *ctx); +int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status); int32_t qwDropTask(QW_FPARAMS_DEF); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 2c0a4072aeb51179b172d8181f220198fa00534b..86fd1d533cb251904428d696b31bfb1e4716fe77 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -279,14 +279,14 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { } } -int32_t qwKillTaskHandle(SQWTaskCtx *ctx) { +int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t code = 0; // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { qDebug("start to kill task"); - code = qAsyncKillTask(taskHandle); + code = qAsyncKillTask(taskHandle, rspCode); atomic_store_ptr(&ctx->taskHandle, taskHandle); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 0890d10b65728e1dadf84c9aea3640dc090d6aa0..9a318df324704291f83b6a69594c06b8823cd053 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -411,7 +411,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC)); @@ -420,7 +420,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu case QW_PHASE_PRE_FETCH: { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { @@ -442,7 +442,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu case QW_PHASE_PRE_CQUERY: { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (ctx->rspCode) { @@ -456,7 +456,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } break; @@ -502,7 +502,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { @@ -515,7 +515,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (ctx->rspCode) { @@ -861,7 +861,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (QW_QUERY_RUNNING(ctx)) { - QW_ERR_JRET(qwKillTaskHandle(ctx)); + QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); } else { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); @@ -869,6 +869,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (!dropped) { + QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } @@ -1195,8 +1196,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { } if (QW_QUERY_RUNNING(ctx)) { - qwKillTaskHandle(ctx); + qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); } else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 8a48977c777af8d794a38ed1721319c4b0646953..02b341e28c5684868e32354de199eb0be252b487 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -302,7 +302,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { return 0; } -int32_t qwtKillTask(qTaskInfo_t qinfo) { return 0; } +int32_t qwtKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return 0; } void qwtDestroyTask(qTaskInfo_t qHandle) {} diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9a7f3332b3de8f1577d998d85966e0d166190347..c8b8db367fd4875b979e3e8de7b103454b07cc55 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -391,6 +391,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) int64_t leftTime = tsMaxRetryWaitTime - lastTime; pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs; + pCtx->roundTimes = 0; + goto _return; } @@ -407,7 +409,7 @@ _return: int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; - SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode)); if (NULL == pData) { pTask->retryTimes = 0; @@ -430,15 +432,15 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 if (SCH_IS_DATA_BIND_TASK(pTask)) { if (pData && pData->pEpSet) { SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); - } else if (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(rspCode)) { + } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); - SCH_SWITCH_EPSET(addr); - SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); + SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; + SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse, + addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode)); } else { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); - SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; - SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse, - addr->epSet.numOfEps, pEp->fqdn, pEp->port); + SCH_SWITCH_EPSET(addr); + SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); } if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e92b44f8c229f01d9214b924928de4311b123d3e..a9afbd7ba85ecca5e6951173c66dea43e55f3725 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1519,7 +1519,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { transFreeMsg(pResp->pCont); transUnrefCliHandle(pConn); } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || - code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) { + code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_VND_STOPPED) { tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, true); transFreeMsg(pResp->pCont); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 2025f196f293f0654a5f3ebacb6875dc9f560e9a..f5d11e3e960c802b92123abb8d507b85c8348342 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -67,6 +67,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed") TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID") TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there") + TAOS_DEFINE_ERROR(TSDB_CODE_APP_ERROR, "Unexpected generic error") TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range") @@ -83,6 +84,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version numbe TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string") TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible") TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error") + TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found") @@ -97,6 +99,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") +TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") +TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down") + //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_QHANDLE, "Invalid qhandle") @@ -316,6 +321,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 74aa7baf0b650e04d4956c734fedb17e2f336f91..f48f438d9fa03e419f27d3c9b32e67db2b686d18 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -219,20 +219,20 @@ typedef struct { int32_t caseRunNum; // total run case num } CaseCtrl; -#if 0 +#if 1 CaseCtrl gCaseCtrl = { .precision = TIME_PRECISION_MICRO, .bindNullNum = 0, .printCreateTblSql = true, .printQuerySql = true, .printStmtSql = true, - .printVerbose = false, - .printRes = false, + .printVerbose = true, + .printRes = true, .autoCreateTbl = false, .numericParam = false, .rowNum = 0, .bindColNum = 0, - .bindTagNum = 14, + .bindTagNum = 0, .bindRowNum = 0, .bindColTypeNum = 0, .bindColTypeList = NULL, @@ -244,7 +244,7 @@ CaseCtrl gCaseCtrl = { .funcIdxList = NULL, .checkParamNum = false, .runTimes = 0, - .caseIdx = 23, + .caseIdx = 0, .caseNum = 1, .caseRunIdx = -1, .caseRunNum = -1, @@ -252,7 +252,7 @@ CaseCtrl gCaseCtrl = { #endif -#if 1 +#if 0 CaseCtrl gCaseCtrl = { // default .precision = TIME_PRECISION_MILLI, .bindNullNum = 0, @@ -2749,6 +2749,7 @@ void runAll(TAOS *taos) { printf("%s Begin\n", gCaseCtrl.caseCatalog); runCaseList(taos); +#if 0 strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.precision = TIME_PRECISION_MICRO; @@ -2804,6 +2805,7 @@ void runAll(TAOS *taos) { gCaseCtrl.bindColNum = 6; runCaseList(taos); gCaseCtrl.bindColNum = 0; +#endif /* strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test");