提交 184f0cf0 编写于 作者: K kailixu

Merge branch 'refact/submit_req' into refact/TD-20895-3.0

---
title: Write from Kafka
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import PyKafka from "./_py_kafka.mdx";
## About Kafka
Apache Kafka is an open-source distributed event streaming platform, used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. For the key concepts of kafka, please refer to [kafka documentation](https://kafka.apache.org/documentation/#gettingStarted).
### kafka topic
Messages in Kafka are organized by topics. A topic may have one or more partitions. We can manage kafka topics through `kafka-topics`.
create a topic named `kafka-events`:
```
bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092
```
Alter `kafka-events` topic to set partitions to 3:
```
bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092
```
Show all topics and partitions in Kafka:
```
bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe
```
## Insert into TDengine
We can write data into TDengine via SQL or Schemaless. For more information, please refer to [Insert Using SQL](/develop/insert-data/sql-writing/) or [High Performance Writing](/develop/insert-data/high-volume/) or [Schemaless Writing](/reference/schemaless/).
## Examples
<Tabs defaultValue="Python" groupId="lang">
<TabItem label="Python" value="Python">
<PyKafka />
</TabItem>
</Tabs>
### python Kafka 客户端
For python kafka client, please refer to [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python). In this document, we use [kafka-python](http://github.com/dpkp/kafka-python).
### consume from Kafka
The simple way to consume messages from Kafka is to read messages one by one. The demo is as follows:
```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
print (msg)
```
For higher performance, we can consume message from kafka in batch. The demo is as follows:
```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
while True:
msgs = consumer.poll(timeout_ms=500, max_records=1000)
if msgs:
print (msgs)
```
### multi-threading
For more higher performance we can process data from kafka in multi-thread. We can use python's ThreadPoolExecutor to achieve multithreading. The demo is as follows:
```
from concurrent.futures import ThreadPoolExecutor, Future
pool = ThreadPoolExecutor(max_workers=10)
pool.submit(...)
```
### multi-process
For more higher performance, sometimes we use multiprocessing. In this case, the number of Kafka Consumers should not be greater than the number of Kafka Topic Partitions. The demo is as follows:
```
from multiprocessing import Process
ps = []
for i in range(5):
p = Process(target=Consumer().consume())
p.start()
ps.append(p)
for p in ps:
p.join()
```
In addition to python's built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn.
### Examples
```py
{{#include docs/examples/python/kafka_example.py}}
```
...@@ -76,7 +76,7 @@ sudo -u grafana grafana-cli plugins install tdengine-datasource ...@@ -76,7 +76,7 @@ sudo -u grafana grafana-cli plugins install tdengine-datasource
You can also download zip files from [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and install manually. The commands are as follows: You can also download zip files from [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and install manually. The commands are as follows:
```bash ```bash
GF_VERSION=3.2.2 GF_VERSION=3.2.7
# from GitHub # from GitHub
wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip
# from Grafana # from Grafana
......
...@@ -38,15 +38,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/ ...@@ -38,15 +38,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/
## Data Connection Setup ## Data Connection Setup
### Download TDengine plug-in to grafana plug-in directory ### Install Grafana Plugin and Configure Data Source
```bash Please refer to [Install Grafana Plugin and Configure Data Source](/third-party/grafana/#install-grafana-plugin-and-configure-data-source)
1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
5. sudo systemctl restart grafana-server.service
```
### Modify /etc/telegraf/telegraf.conf ### Modify /etc/telegraf/telegraf.conf
......
...@@ -41,15 +41,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/ ...@@ -41,15 +41,9 @@ Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/
## Data Connection Setup ## Data Connection Setup
### Copy the TDengine plugin to the grafana plugin directory ### Install Grafana Plugin and Configure Data Source
```bash Please refer to [Install Grafana Plugin and Configure Data Source](/third-party/grafana/#install-grafana-plugin-and-configure-data-source)
1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
5. sudo systemctl restart grafana-server.service
```
### Configure collectd ### Configure collectd
......
#! encoding = utf-8
import json
import time
from json import JSONDecodeError
from typing import Callable
import logging
from concurrent.futures import ThreadPoolExecutor, Future
import taos
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
class Consumer(object):
DEFAULT_CONFIGS = {
'kafka_brokers': 'localhost:9092',
'kafka_topic': 'python_kafka',
'kafka_group_id': 'taos',
'taos_host': 'localhost',
'taos_user': 'root',
'taos_password': 'taosdata',
'taos_database': 'power',
'taos_port': 6030,
'timezone': None,
'clean_after_testing': False,
'bath_consume': True,
'batch_size': 1000,
'async_model': True,
'workers': 10
}
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
'California.SantaClara', 'California.Cupertino']
CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1'
USE_DATABASE_SQL = 'use {}'
DROP_TABLE_SQL = 'drop table if exists meters'
DROP_DATABASE_SQL = 'drop database if exists {}'
CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) ' \
'tags (location binary(64), groupId int)'
CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})'
INSERT_SQL_HEADER = "insert into "
INSERT_PART_SQL = 'power.{} values (\'{}\', {}, {}, {})'
def __init__(self, **configs):
self.config: dict = self.DEFAULT_CONFIGS
self.config.update(configs)
self.consumer = KafkaConsumer(
self.config.get('kafka_topic'), # topic
bootstrap_servers=self.config.get('kafka_brokers'),
group_id=self.config.get('kafka_group_id'),
)
self.taos = taos.connect(
host=self.config.get('taos_host'),
user=self.config.get('taos_user'),
password=self.config.get('taos_password'),
port=self.config.get('taos_port'),
timezone=self.config.get('timezone'),
)
if self.config.get('async_model'):
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
self.tasks: list[Future] = []
# tags and table mapping # key: {location}_{groupId} value:
self.tag_table_mapping = {}
i = 0
for location in self.LOCATIONS:
for j in range(1, 11):
table_name = 'd{}'.format(i)
self._cache_table(location=location, group_id=j, table_name=table_name)
i += 1
def init_env(self):
# create database and table
self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
self.taos.execute(self.CREATE_DATABASE_SQL.format(self.config.get('taos_database')))
self.taos.execute(self.USE_DATABASE_SQL.format(self.config.get('taos_database')))
self.taos.execute(self.DROP_TABLE_SQL)
self.taos.execute(self.CREATE_STABLE_SQL)
for tags, table_name in self.tag_table_mapping.items():
location, group_id = _get_location_and_group(tags)
self.taos.execute(self.CREATE_TABLE_SQL.format(table_name, location, group_id))
def consume(self):
logging.warning('## start consumer topic-[%s]', self.config.get('kafka_topic'))
try:
if self.config.get('bath_consume'):
self._run_batch(self._to_taos_batch)
else:
self._run(self._to_taos)
except KeyboardInterrupt:
logging.warning("## caught keyboard interrupt, stopping")
finally:
self.stop()
def stop(self):
# close consumer
if self.consumer is not None:
self.consumer.commit()
self.consumer.close()
# multi thread
if self.config.get('async_model'):
for task in self.tasks:
while not task.done():
pass
if self.pool is not None:
self.pool.shutdown()
# clean data
if self.config.get('clean_after_testing'):
self.taos.execute(self.DROP_TABLE_SQL)
self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
# close taos
if self.taos is not None:
self.taos.close()
def _run(self, f: Callable[[ConsumerRecord], bool]):
for message in self.consumer:
if self.config.get('async_model'):
self.pool.submit(f(message))
else:
f(message)
def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]):
while True:
messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
if messages:
if self.config.get('async_model'):
self.pool.submit(f, messages.values())
else:
f(list(messages.values()))
if not messages:
time.sleep(0.1)
def _to_taos(self, message: ConsumerRecord) -> bool:
sql = self.INSERT_SQL_HEADER + self._build_sql(message.value)
if len(sql) == 0: # decode error, skip
return True
logging.info('## insert sql %s', sql)
return self.taos.execute(sql=sql) == 1
def _to_taos_batch(self, messages: list[list[ConsumerRecord]]):
sql = self._build_sql_batch(messages=messages)
if len(sql) == 0: # decode error, skip
return
self.taos.execute(sql=sql)
def _build_sql(self, msg_value: str) -> str:
try:
data = json.loads(msg_value)
except JSONDecodeError as e:
logging.error('## decode message [%s] error ', msg_value, e)
return ''
location = data.get('location')
group_id = data.get('groupId')
ts = data.get('ts')
current = data.get('current')
voltage = data.get('voltage')
phase = data.get('phase')
table_name = self._get_table_name(location=location, group_id=group_id)
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str:
sql_list = []
for partition_messages in messages:
for message in partition_messages:
sql_list.append(self._build_sql(message.value))
return self.INSERT_SQL_HEADER + ' '.join(sql_list)
def _cache_table(self, location: str, group_id: int, table_name: str):
self.tag_table_mapping[_tag_table_mapping_key(location=location, group_id=group_id)] = table_name
def _get_table_name(self, location: str, group_id: int) -> str:
return self.tag_table_mapping.get(_tag_table_mapping_key(location=location, group_id=group_id))
def _tag_table_mapping_key(location: str, group_id: int):
return '{}_{}'.format(location, group_id)
def _get_location_and_group(key: str) -> (str, int):
fields = key.split('_')
return fields[0], fields[1]
if __name__ == '__main__':
consumer = Consumer(async_model=True)
consumer.init_env()
consumer.consume()
\ No newline at end of file
---
title: 从 Kafka 写入
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import PyKafka from "./_py_kafka.mdx";
## Kafka 介绍
Apache Kafka 是开源的分布式消息分发平台,被广泛应用于高性能数据管道、流式数据分析、数据集成和事件驱动类型的应用程序。Kafka 包含 Producer、Consumer 和 Topic,其中 Producer 是向 Kafka 发送消息的进程,Consumer 是从 Kafka 消费消息的进程。Kafka 相关概念可以参考[官方文档](https://kafka.apache.org/documentation/#gettingStarted)。
### kafka topic
Kafka 的消息按 topic 组织,每个 topic 会有一到多个 partition。可以通过 kafka 的 `kafka-topics` 管理 topic。
创建名为 `kafka-events` 的topic:
```
bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092
```
修改 `kafka-events` 的 partition 数量为 3:
```
bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092
```
展示所有的 topic 和 partition:
```
bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe
```
## 写入 TDengine
TDengine 支持 Sql 方式和 Schemaless 方式的数据写入,Sql 方式数据写入可以参考 [TDengine SQL 写入](/develop/insert-data/sql-writing/) 和 [TDengine 高效写入](/develop/insert-data/high-volume/)。Schemaless 方式数据写入可以参考 [TDengine Schemaless 写入](/reference/schemaless/) 文档。
## 示例代码
<Tabs defaultValue="Python" groupId="lang">
<TabItem label="Python" value="Python">
<PyKafka />
</TabItem>
</Tabs>
### python Kafka 客户端
Kafka 的 python 客户端可以参考文档 [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python)。推荐使用 [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python) 和 [kafka-python](http://github.com/dpkp/kafka-python)。以下示例以 [kafka-python](http://github.com/dpkp/kafka-python) 为例。
### 从 Kafka 消费数据
Kafka 客户端采用 pull 的方式从 Kafka 消费数据,可以采用单条消费的方式或批量消费的方式读取数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端单条消费数据的示例如下:
```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
print (msg)
```
单条消费的方式在数据流量大的情况下往往存在性能瓶颈,导致 Kafka 消息积压,更推荐使用批量消费的方式消费数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端批量消费数据的示例如下:
```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
while True:
msgs = consumer.poll(timeout_ms=500, max_records=1000)
if msgs:
print (msgs)
```
### Python 多线程
为了提高数据写入效率,通常采用多线程的方式写入数据,可以使用 python 线程池 ThreadPoolExecutor 实现多线程。示例代码如下:
```
from concurrent.futures import ThreadPoolExecutor, Future
pool = ThreadPoolExecutor(max_workers=10)
pool.submit(...)
```
### Python 多进程
单个python进程不能充分发挥多核 CPU 的性能,有时候我们会选择多进程的方式。在多进程的情况下,需要注意,Kafka Consumer 的数量应该小于等于 Kafka Topic Partition 数量。Python 多进程示例代码如下:
```
from multiprocessing import Process
ps = []
for i in range(5):
p = Process(target=Consumer().consume())
p.start()
ps.append(p)
for p in ps:
p.join()
```
除了 Python 内置的多线程和多进程方式,还可以通过第三方库 gunicorn 实现并发。
### 完整示例
```py
{{#include docs/examples/python/kafka_example.py}}
```
...@@ -39,15 +39,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如 ...@@ -39,15 +39,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如
## 数据链路设置 ## 数据链路设置
### 下载 TDengine 插件到 Grafana 插件目录 ### 安装 Grafana Plugin 并配置数据源
```bash 请参考[安装 Grafana Plugin 并配置数据源](/third-party/grafana/#%E5%AE%89%E8%A3%85-grafana-plugin-%E5%B9%B6%E9%85%8D%E7%BD%AE%E6%95%B0%E6%8D%AE%E6%BA%90)
1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
5. sudo systemctl restart grafana-server.service
```
### 修改 /etc/telegraf/telegraf.conf ### 修改 /etc/telegraf/telegraf.conf
......
...@@ -41,15 +41,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如 ...@@ -41,15 +41,9 @@ IT 运维监测数据通常都是对时间特性比较敏感的数据,例如
## 数据链路设置 ## 数据链路设置
### 复制 TDengine 插件到 grafana 插件目录 ### 安装 Grafana Plugin 并配置数据源
```bash 请参考[安装 Grafana Plugin 并配置数据源](/third-party/grafana/#%E5%AE%89%E8%A3%85-grafana-plugin-%E5%B9%B6%E9%85%8D%E7%BD%AE%E6%95%B0%E6%8D%AE%E6%BA%90)
1. wget -c https://github.com/taosdata/grafanaplugin/releases/download/v3.1.3/tdengine-datasource-3.1.3.zip
2. sudo unzip tdengine-datasource-3.1.3.zip -d /var/lib/grafana/plugins/
3. sudo chown grafana:grafana -R /var/lib/grafana/plugins/tdengine
4. echo -e "[plugins]\nallow_loading_unsigned_plugins = tdengine-datasource\n" | sudo tee -a /etc/grafana/grafana.ini
5. sudo systemctl restart grafana-server.service
```
### 配置 collectd ### 配置 collectd
......
...@@ -89,12 +89,12 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); ...@@ -89,12 +89,12 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
void tRowDestroy(SRow *pRow); void tRowDestroy(SRow *pRow);
void tRowSort(SArray *aRowP); void tRowSort(SArray *aRowP);
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData);
// SRowIter ================================ // SRowIter ================================
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
void tRowIterClose(SRowIter **ppIter); void tRowIterClose(SRowIter **ppIter);
SColVal *tRowIterNext(SRowIter *pIter); SColVal *tRowIterNext(SRowIter *pIter);
int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData);
// STag ================================ // STag ================================
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
......
...@@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); ...@@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
* @param tinfo qhandle * @param tinfo qhandle
* @return * @return
*/ */
int32_t qAsyncKillTask(qTaskInfo_t tinfo); int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
/** /**
* destroy query info structure * destroy query info structure
......
...@@ -260,15 +260,16 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -260,15 +260,16 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) #define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) #define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later #define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later
#define NEED_REDIRECT_ERROR(_code) \ #define NEED_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \ SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK) (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK || \
(_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
......
...@@ -40,21 +40,37 @@ int32_t* taosGetErrno(); ...@@ -40,21 +40,37 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error #define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
// rpc // rpc
// #define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //2.x
// #define TSDB_CODE_RPC_AUTH_REQUIRED TAOS_DEF_ERROR_CODE(0, 0x0002) //2.x
#define TSDB_CODE_RPC_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0003) #define TSDB_CODE_RPC_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0003)
#define TSDB_CODE_RPC_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0004) #define TSDB_CODE_RPC_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0004)
// #define TSDB_CODE_RPC_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0005) //2.x
// #define TSDB_CODE_RPC_ALREADY_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0006) //2.x
// #define TSDB_CODE_RPC_LAST_SESSION_NOT_FINI. TAOS_DEF_ERROR_CODE(0, 0x0007) //2.x
// #define TSDB_CODE_RPC_MISMATCHED_LINK_ID TAOS_DEF_ERROR_CODE(0, 0x0008) //2.x
// #define TSDB_CODE_RPC_TOO_SLOW TAOS_DEF_ERROR_CODE(0, 0x0009) //2.x
// #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x000A) //2.x
#define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x000B) #define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x000B)
// #define TSDB_CODE_RPC_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x000C) //2.x
// #define TSDB_CODE_RPC_UNEXPECTED_RESPONSE TAOS_DEF_ERROR_CODE(0, 0x000D) //2.x
// #define TSDB_CODE_RPC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x000E) //2.x
// #define TSDB_CODE_RPC_INVALID_TRAN_ID TAOS_DEF_ERROR_CODE(0, 0x000F) //2.x
// #define TSDB_CODE_RPC_INVALID_SESSION_ID TAOS_DEF_ERROR_CODE(0, 0x0010) //2.x
// #define TSDB_CODE_RPC_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0011) //2.x
// #define TSDB_CODE_RPC_INVALID_RESPONSE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0012) //2.x
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013)
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014)
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) #define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
// #define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) //2.x
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) #define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018) #define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018)
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) #define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019)
//common & util //common & util
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013)
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014)
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) #define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100)
#define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0101) #define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0101)
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0102) #define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0102)
// #define TSDB_CODE_COM_INVALID_CFG_MSG TAOS_DEF_ERROR_CODE(0, 0x0103) // 2.x
#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104) #define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0105) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0105)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0106) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0106)
...@@ -69,7 +85,7 @@ int32_t* taosGetErrno(); ...@@ -69,7 +85,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0113) #define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0113)
#define TSDB_CODE_INVALID_SHM_ID TAOS_DEF_ERROR_CODE(0, 0x0114) #define TSDB_CODE_INVALID_SHM_ID TAOS_DEF_ERROR_CODE(0, 0x0114)
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0115) #define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0115)
#define TSDB_CODE_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0116) #define TSDB_CODE_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0116) //
#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0117) #define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0117)
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x0118) #define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x0118)
#define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x0119) #define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x0119)
...@@ -81,7 +97,7 @@ int32_t* taosGetErrno(); ...@@ -81,7 +97,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x011F) #define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x011F)
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0120) #define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0120)
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121) #define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121) //
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0122) #define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0122)
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0123) #define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0123)
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0124) #define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0124)
...@@ -94,6 +110,9 @@ int32_t* taosGetErrno(); ...@@ -94,6 +110,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B) #define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B)
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) //
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) //
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) #define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201)
...@@ -324,6 +343,7 @@ int32_t* taosGetErrno(); ...@@ -324,6 +343,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526) #define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526)
#define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527) #define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527)
#define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528) #define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528)
#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
...@@ -764,7 +764,7 @@ int stmtAddBatch(TAOS_STMT* stmt) { ...@@ -764,7 +764,7 @@ int stmtAddBatch(TAOS_STMT* stmt) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
//STMT_ERR_RET(stmtCacheBlock(pStmt)); STMT_ERR_RET(stmtCacheBlock(pStmt));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -2061,8 +2061,8 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall ...@@ -2061,8 +2061,8 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall
} }
// value // value
if (pColData->nVal) { if (pColData->nData) {
pColData->pData = xMalloc(arg, pColData->nVal); pColData->pData = xMalloc(arg, pColData->nData);
if (pColData->pData == NULL) { if (pColData->pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
......
...@@ -144,6 +144,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -144,6 +144,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
break; break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed in mgmt queue", pMsg);
break; break;
} }
......
...@@ -51,13 +51,15 @@ static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { ...@@ -51,13 +51,15 @@ static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
} }
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
if (msgFp == NULL) { if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed since no handler", pMsg);
return -1; return -1;
} }
const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name); dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name);
pMsg->info.wrapper = pWrapper; pMsg->info.wrapper = pWrapper;
return (*msgFp)(pWrapper->pMgmt, pMsg); return (*msgFp)(pWrapper->pMgmt, pMsg);
...@@ -99,18 +101,23 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -99,18 +101,23 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmProcessServerStartupStatus(pDnode, pRpc); dmProcessServerStartupStatus(pDnode, pRpc);
return; return;
} else { } else {
terrno = TSDB_CODE_APP_NOT_READY; if (pDnode->status == DND_STAT_INIT) {
terrno = TSDB_CODE_APP_IS_STARTING;
} else {
terrno = TSDB_CODE_APP_IS_STOPPING;
}
goto _OVER; goto _OVER;
} }
} }
if (IsReq(pRpc) && pRpc->pCont == NULL) { if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType)); dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN; terrno = TSDB_CODE_INVALID_MSG_LEN;
goto _OVER; goto _OVER;
} }
if (pHandle->defaultNtype == NODE_END) { if (pHandle->defaultNtype == NODE_END) {
dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
goto _OVER; goto _OVER;
} }
...@@ -234,7 +241,8 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe ...@@ -234,7 +241,8 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe
static bool rpcRfp(int32_t code, tmsg_t msgType) { static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_VND_STOPPED) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) { msgType == TDMT_SCH_MERGE_FETCH) {
return false; return false;
......
...@@ -644,17 +644,6 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -644,17 +644,6 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
return -1; return -1;
} }
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
const STraceId *trace = &pMsg->info.traceId;
mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN;
return -1;
}
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
...@@ -666,7 +655,6 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { ...@@ -666,7 +655,6 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
return -1; return -1;
} }
if (mndCheckMsgContent(pMsg) != 0) return -1;
if (mndCheckMnodeState(pMsg) != 0) return -1; if (mndCheckMnodeState(pMsg) != 0) return -1;
mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
......
...@@ -117,7 +117,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) { ...@@ -117,7 +117,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tsdbRowCmprFn(const void *p1, const void *p2); int32_t tsdbRowCmprFn(const void *p1, const void *p2);
// STSDBRowIter // STSDBRowIter
void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowClose(STSDBRowIter *pIter);
SColVal *tsdbRowIterNext(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger // SRowMerger
int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
...@@ -571,10 +572,14 @@ struct SDFileSet { ...@@ -571,10 +572,14 @@ struct SDFileSet {
}; };
struct STSDBRowIter { struct STSDBRowIter {
TSDBROW *pRow; TSDBROW *pRow;
STSchema *pTSchema; union {
SColVal colVal; SRowIter *pIter;
int32_t i; struct {
int32_t iColData;
SColVal cv;
};
};
}; };
struct SRowMerger { struct SRowMerger {
STSchema *pTSchema; STSchema *pTSchema;
......
...@@ -596,7 +596,7 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS ...@@ -596,7 +596,7 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS
if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts; if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts;
STSDBRowIter iter = {0}; STSDBRowIter iter = {0};
tsdbRowIterInit(&iter, pRow, pTSchema); tsdbRowIterOpen(&iter, pRow, pTSchema);
SColVal *pColVal = tsdbRowIterNext(&iter); SColVal *pColVal = tsdbRowIterNext(&iter);
for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) { for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) {
......
...@@ -194,7 +194,7 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed); ...@@ -194,7 +194,7 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
SVersionRange* pVerRange); SVersionRange* pVerRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pTSRow,
STsdbReader* pReader, bool* freeTSRow); STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, SRow** pTSRow); STsdbReader* pReader, SRow** pTSRow);
...@@ -3358,7 +3358,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc ...@@ -3358,7 +3358,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
STsdbReader* pReader, bool* freeTSRow) { STsdbReader* pReader, bool* freeTSRow) {
TSDBROW* pNextRow = NULL; TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow; TSDBROW current = *pRow;
...@@ -3367,25 +3367,27 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3367,25 +3367,27 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
pIter->hasVal = tsdbTbDataIterNext(pIter->iter); pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) { if (!pIter->hasVal) {
*pTSRow = current.pTSRow; *pResRow = *pRow;
*freeTSRow = false; *freeTSRow = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // has next point in mem/imem } else { // has next point in mem/imem
pNextRow = getValidMemRow(pIter, pDelList, pReader); pNextRow = getValidMemRow(pIter, pDelList, pReader);
if (pNextRow == NULL) { if (pNextRow == NULL) {
*pTSRow = current.pTSRow; *pResRow = current;
*freeTSRow = false; *freeTSRow = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) { if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) {
*pTSRow = current.pTSRow; *pResRow = current;
*freeTSRow = false; *freeTSRow = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
} }
#if 0
// todo handle the data block merge
SRowMerger merge = {0}; SRowMerger merge = {0};
// get the correct schema for data in memory // get the correct schema for data in memory
...@@ -3423,6 +3425,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3423,6 +3425,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
tsdbRowMergerClear(&merge); tsdbRowMergerClear(&merge);
*freeTSRow = true; *freeTSRow = true;
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3480,7 +3484,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3480,7 +3484,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code; return code;
} }
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow, int64_t endKey, int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey,
bool* freeTSRow) { bool* freeTSRow) {
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
...@@ -3510,13 +3514,14 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ...@@ -3510,13 +3514,14 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (ik.ts != k.ts) { if (ik.ts != k.ts) {
if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts
code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
} else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
} }
} else { // ik.ts == k.ts } else { // ik.ts == k.ts
*freeTSRow = true; *freeTSRow = true;
code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); pResRow->type = TSDBROW_ROW_FMT;
code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3526,12 +3531,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ...@@ -3526,12 +3531,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
} }
if (pBlockScanInfo->iter.hasVal && pRow != NULL) { if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader,
freeTSRow); freeTSRow);
} }
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3636,17 +3641,22 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -3636,17 +3641,22 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
do { do {
SRow* pTSRow = NULL; // SRow* pTSRow = NULL;
TSDBROW row = {.type = -1};
bool freeTSRow = false; bool freeTSRow = false;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow); tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
if (pTSRow == NULL) { if (row.type == -1) {
break; break;
} }
doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo); if (row.type == TSDBROW_ROW_FMT) {
doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
if (freeTSRow) { if (freeTSRow) {
taosMemoryFree(pTSRow); taosMemoryFree(row.pTSRow);
}
} else {
doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
} }
// no data in buffer, return immediately // no data in buffer, return immediately
......
...@@ -594,42 +594,50 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2) { ...@@ -594,42 +594,50 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
} }
// STSDBRowIter ====================================================== // STSDBRowIter ======================================================
void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
pIter->pRow = pRow; pIter->pRow = pRow;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
ASSERT(pTSchema); code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter);
pIter->pTSchema = pTSchema; if (code) goto _exit;
pIter->i = 1;
} else if (pRow->type == TSDBROW_COL_FMT) { } else if (pRow->type == TSDBROW_COL_FMT) {
pIter->pTSchema = NULL; pIter->iColData = 0;
pIter->i = 0;
} else { } else {
ASSERT(0); ASSERT(0);
} }
_exit:
return code;
} }
SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { void tsdbRowClose(STSDBRowIter *pIter) {
if (pIter->pRow->type == TSDBROW_ROW_FMT) { if (pIter->pRow->type == TSDBROW_ROW_FMT) {
if (pIter->i < pIter->pTSchema->numOfCols) { tRowIterClose(&pIter->pIter);
tRowGet(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal); }
pIter->i++; }
return &pIter->colVal; SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
} if (pIter->pRow->type == TSDBROW_ROW_FMT) {
return tRowIterNext(pIter->pIter);
} else if (pIter->pRow->type == TSDBROW_COL_FMT) { } else if (pIter->pRow->type == TSDBROW_COL_FMT) {
if (pIter->i < pIter->pRow->pBlockData->nColData) { if (pIter->iColData == 0) {
SColData *pColData = tBlockDataGetColDataByIdx(pIter->pRow->pBlockData, pIter->i); pIter->cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP,
(SValue){.val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]});
tColDataGetValue(pColData, pIter->pRow->iRow, &pIter->colVal); ++pIter->iColData;
pIter->i++; return &pIter->cv;
}
return &pIter->colVal; if (pIter->iColData < pIter->pRow->pBlockData->nColData) {
tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData], pIter->pRow->iRow, &pIter->cv);
++pIter->iColData;
return &pIter->cv;
} else {
return NULL;
} }
} else { } else {
ASSERT(0); ASSERT(0);
} }
return NULL;
} }
// SRowMerger ====================================================== // SRowMerger ======================================================
...@@ -1097,7 +1105,8 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD ...@@ -1097,7 +1105,8 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
int32_t ridx = pBlockData->nColData - 1; int32_t ridx = pBlockData->nColData - 1;
while (lidx <= ridx) { while (lidx <= ridx) {
int32_t midx = (lidx + ridx) >> 2; int32_t midx = lidx + (ridx - lidx) * (cid - pBlockData->aColData[lidx].cid) /
(pBlockData->aColData[ridx].cid - pBlockData->aColData[lidx].cid);
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, midx); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, midx);
int32_t c = (pColData->cid == cid) ? 0 : ((pColData->cid > cid) ? 1 : -1); int32_t c = (pColData->cid == cid) ? 0 : ((pColData->cid > cid) ? 1 : -1);
......
...@@ -781,7 +781,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, ...@@ -781,7 +781,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
......
...@@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
for (int32_t i = 0; i < totalSources; ++i) { for (int32_t i = 0; i < totalSources; ++i) {
...@@ -573,7 +573,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { ...@@ -573,7 +573,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
tsem_post(&pExchangeInfo->ready); tsem_post(&pExchangeInfo->ready);
...@@ -621,7 +621,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -621,7 +621,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
......
...@@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { ...@@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
taosWUnLockLatch(&pTaskInfo->stopInfo.lock); taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
} }
int32_t qAsyncKillTask(qTaskInfo_t qinfo) { int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) { if (pTaskInfo == NULL) {
...@@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { ...@@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo); setTaskKilled(pTaskInfo, rspCode);
qStopTaskOperators(pTaskInfo); qStopTaskOperators(pTaskInfo);
......
...@@ -610,21 +610,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB ...@@ -610,21 +610,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
} }
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived return (0 != pTaskInfo->code) ? true : false;
// abort current query execution.
if (pTaskInfo->owner != 0 &&
((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
/*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
assert(pTaskInfo->cost.start != 0);
// qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
// ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
// return true;
}
return false;
} }
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; } void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
......
...@@ -629,7 +629,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -629,7 +629,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) { while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
// process this data block based on the probabilities // process this data block based on the probabilities
...@@ -2032,7 +2032,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -2032,7 +2032,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pTaskInfo->env, pTaskInfo->code);
} }
int32_t rows = 0; int32_t rows = 0;
...@@ -2529,7 +2529,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2529,7 +2529,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
STsdbReader* reader = pInfo->base.dataReader; STsdbReader* reader = pInfo->base.dataReader;
while (tsdbNextDataBlock(reader)) { while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
// process this data block based on the probabilities // process this data block based on the probabilities
......
...@@ -270,7 +270,7 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p ...@@ -270,7 +270,7 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p
#if __AVX2__ #if __AVX2__
// find the start position that are aligned to 32bytes address in memory // find the start position that are aligned to 32bytes address in memory
int32_t width = (bitWidth>>3u) / sizeof(int64_t); int32_t width = (bitWidth >> 3u) / sizeof(int64_t);
int32_t remainder = numOfRows % width; int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width; int32_t rounds = numOfRows / width;
...@@ -286,20 +286,11 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p ...@@ -286,20 +286,11 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p
} }
// let sum up the final results // let sum up the final results
if (type == TSDB_DATA_TYPE_BIGINT) { const int64_t* q = (const int64_t*)&sum;
const int64_t* q = (const int64_t*)&sum; pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width];
}
} else {
const uint64_t* q = (const uint64_t*)&sum;
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) { for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.usum += (uint64_t)plist[j + rounds * width]; pRes->sum.isum += plist[j + rounds * width];
}
} }
#endif #endif
...@@ -588,7 +579,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { ...@@ -588,7 +579,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
const int64_t* plist = (const int64_t*) pCol->pData; const int64_t* plist = (const int64_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) { if (simdAvailable && type == TSDB_DATA_TYPE_BIGINT) {
i64VectorSumAVX2(plist, numOfRows, pAvgRes); i64VectorSumAVX2(plist, numOfRows, pAvgRes);
} else { } else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
......
...@@ -156,31 +156,95 @@ end: ...@@ -156,31 +156,95 @@ end:
return code; return code;
} }
int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND* src, TAOS_MULTI_BIND* dst) {
int32_t output = 0;
int32_t newBuflen = (pSchema->bytes - VARSTR_HEADER_SIZE) * src->num;
if (dst->buffer_length < newBuflen) {
dst->buffer = taosMemoryRealloc(dst->buffer, newBuflen);
if (NULL == dst->buffer) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (NULL == dst->length) {
dst->length = taosMemoryRealloc(dst->length, sizeof(int32_t) * src->num);
if (NULL == dst->buffer) {
taosMemoryFreeClear(dst->buffer);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
dst->buffer_length = pSchema->bytes - VARSTR_HEADER_SIZE;
for (int32_t i = 0; i < src->num; ++i) {
if (src->is_null && src->is_null[i]) {
continue;
}
if (!taosMbsToUcs4(src->buffer + src->buffer_length * i, src->length[i], (TdUcs4*)(dst->buffer + dst->buffer_length * i), dst->buffer_length, &output)) {
if (errno == E2BIG) {
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno));
return buildSyntaxErrMsg(pMsgBuf, buf, NULL);
}
dst->length[i] = output;
}
dst->buffer_type = src->buffer_type;
dst->is_null = src->is_null;
dst->num = src->num;
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t rowNum = bind->num; int32_t rowNum = bind->num;
TAOS_MULTI_BIND ncharBind = {0};
TAOS_MULTI_BIND* pBind = NULL;
int32_t code = 0;
for (int c = 0; c < boundInfo->numOfBound; ++c) { for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c); SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c);
if (bind[c].num != rowNum) { if (bind[c].num != rowNum) {
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
goto _return;
} }
if (bind[c].buffer_type != pColSchema->type) { if (bind[c].buffer_type != pColSchema->type) {
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
goto _return;
} }
tColDataAddValueByBind(pCol, bind + c); if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
code = convertStmtNcharCol(&pBuf, pColSchema, bind + c, &ncharBind);
if (code) {
goto _return;
}
pBind = &ncharBind;
} else {
pBind = bind + c;
}
tColDataAddValueByBind(pCol, pBind);
} }
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum); qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
return TSDB_CODE_SUCCESS; _return:
taosMemoryFree(ncharBind.buffer);
taosMemoryFree(ncharBind.length);
return code;
} }
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
...@@ -191,6 +255,9 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu ...@@ -191,6 +255,9 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]]; SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx); SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx);
TAOS_MULTI_BIND ncharBind = {0};
TAOS_MULTI_BIND* pBind = NULL;
int32_t code = 0;
if (bind->num != rowNum) { if (bind->num != rowNum) {
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
...@@ -199,12 +266,27 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu ...@@ -199,12 +266,27 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
if (bind->buffer_type != pColSchema->type) { if (bind->buffer_type != pColSchema->type) {
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
} }
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
code = convertStmtNcharCol(&pBuf, pColSchema, bind, &ncharBind);
if (code) {
goto _return;
}
pBind = &ncharBind;
} else {
pBind = bind;
}
tColDataAddValueByBind(pCol, bind); tColDataAddValueByBind(pCol, pBind);
qDebug("stmt col %d bind %d rows data", colIdx, rowNum); qDebug("stmt col %d bind %d rows data", colIdx, rowNum);
return TSDB_CODE_SUCCESS; _return:
taosMemoryFree(ncharBind.buffer);
taosMemoryFree(ncharBind.length);
return code;
} }
int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
......
...@@ -750,8 +750,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) { ...@@ -750,8 +750,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId); return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId);
} else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) { } else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) {
SFunctionNode* pFunc = (SFunctionNode*)pExpr; SFunctionNode* pFunc = (SFunctionNode*)pExpr;
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_FIRST == pFunc->funcType || if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType ||
FUNCTION_TYPE_LAST == pFunc->funcType) { FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0)); return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) { } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) {
return true; return true;
......
...@@ -363,7 +363,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); ...@@ -363,7 +363,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx);
int32_t qwKillTaskHandle(SQWTaskCtx *ctx); int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode);
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status);
int32_t qwDropTask(QW_FPARAMS_DEF); int32_t qwDropTask(QW_FPARAMS_DEF);
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
......
...@@ -279,14 +279,14 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { ...@@ -279,14 +279,14 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
} }
} }
int32_t qwKillTaskHandle(SQWTaskCtx *ctx) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
qDebug("start to kill task"); qDebug("start to kill task");
code = qAsyncKillTask(taskHandle); code = qAsyncKillTask(taskHandle, rspCode);
atomic_store_ptr(&ctx->taskHandle, taskHandle); atomic_store_ptr(&ctx->taskHandle, taskHandle);
} }
......
...@@ -411,7 +411,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -411,7 +411,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
...@@ -420,7 +420,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -420,7 +420,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
case QW_PHASE_PRE_FETCH: { case QW_PHASE_PRE_FETCH: {
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
...@@ -442,7 +442,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -442,7 +442,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
case QW_PHASE_PRE_CQUERY: { case QW_PHASE_PRE_CQUERY: {
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
if (ctx->rspCode) { if (ctx->rspCode) {
...@@ -456,7 +456,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -456,7 +456,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
break; break;
...@@ -502,7 +502,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -502,7 +502,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
...@@ -515,7 +515,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -515,7 +515,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(ctx->rspCode);
} }
if (ctx->rspCode) { if (ctx->rspCode) {
...@@ -861,7 +861,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -861,7 +861,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx)); QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else { } else {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
...@@ -869,6 +869,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -869,6 +869,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
if (!dropped) { if (!dropped) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} }
...@@ -1195,8 +1196,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { ...@@ -1195,8 +1196,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
} }
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx); qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { } else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} }
......
...@@ -302,7 +302,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { ...@@ -302,7 +302,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
return 0; return 0;
} }
int32_t qwtKillTask(qTaskInfo_t qinfo) { return 0; } int32_t qwtKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return 0; }
void qwtDestroyTask(qTaskInfo_t qHandle) {} void qwtDestroyTask(qTaskInfo_t qHandle) {}
......
...@@ -391,6 +391,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) ...@@ -391,6 +391,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet)
int64_t leftTime = tsMaxRetryWaitTime - lastTime; int64_t leftTime = tsMaxRetryWaitTime - lastTime;
pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs; pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
pCtx->roundTimes = 0;
goto _return; goto _return;
} }
...@@ -407,7 +409,7 @@ _return: ...@@ -407,7 +409,7 @@ _return:
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
if (NULL == pData) { if (NULL == pData) {
pTask->retryTimes = 0; pTask->retryTimes = 0;
...@@ -430,15 +432,15 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 ...@@ -430,15 +432,15 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pData && pData->pEpSet) { if (pData && pData->pEpSet) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} else if (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(rspCode)) { } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SCH_SWITCH_EPSET(addr); SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
} else { } else {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; SCH_SWITCH_EPSET(addr);
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse, SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
addr->epSet.numOfEps, pEp->fqdn, pEp->port);
} }
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
......
...@@ -1519,7 +1519,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1519,7 +1519,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) { code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_VND_STOPPED) {
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true); noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);
......
...@@ -67,6 +67,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed") ...@@ -67,6 +67,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID") TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there") TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_ERROR, "Unexpected generic error") TAOS_DEFINE_ERROR(TSDB_CODE_APP_ERROR, "Unexpected generic error")
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range")
...@@ -83,6 +84,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version numbe ...@@ -83,6 +84,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version numbe
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string")
TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible") TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible")
TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error") TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error")
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found") TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
...@@ -97,6 +99,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") ...@@ -97,6 +99,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
//client //client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_QHANDLE, "Invalid qhandle") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_QHANDLE, "Invalid qhandle")
...@@ -316,6 +321,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already ...@@ -316,6 +321,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
...@@ -219,20 +219,20 @@ typedef struct { ...@@ -219,20 +219,20 @@ typedef struct {
int32_t caseRunNum; // total run case num int32_t caseRunNum; // total run case num
} CaseCtrl; } CaseCtrl;
#if 0 #if 1
CaseCtrl gCaseCtrl = { CaseCtrl gCaseCtrl = {
.precision = TIME_PRECISION_MICRO, .precision = TIME_PRECISION_MICRO,
.bindNullNum = 0, .bindNullNum = 0,
.printCreateTblSql = true, .printCreateTblSql = true,
.printQuerySql = true, .printQuerySql = true,
.printStmtSql = true, .printStmtSql = true,
.printVerbose = false, .printVerbose = true,
.printRes = false, .printRes = true,
.autoCreateTbl = false, .autoCreateTbl = false,
.numericParam = false, .numericParam = false,
.rowNum = 0, .rowNum = 0,
.bindColNum = 0, .bindColNum = 0,
.bindTagNum = 14, .bindTagNum = 0,
.bindRowNum = 0, .bindRowNum = 0,
.bindColTypeNum = 0, .bindColTypeNum = 0,
.bindColTypeList = NULL, .bindColTypeList = NULL,
...@@ -244,7 +244,7 @@ CaseCtrl gCaseCtrl = { ...@@ -244,7 +244,7 @@ CaseCtrl gCaseCtrl = {
.funcIdxList = NULL, .funcIdxList = NULL,
.checkParamNum = false, .checkParamNum = false,
.runTimes = 0, .runTimes = 0,
.caseIdx = 23, .caseIdx = 0,
.caseNum = 1, .caseNum = 1,
.caseRunIdx = -1, .caseRunIdx = -1,
.caseRunNum = -1, .caseRunNum = -1,
...@@ -252,7 +252,7 @@ CaseCtrl gCaseCtrl = { ...@@ -252,7 +252,7 @@ CaseCtrl gCaseCtrl = {
#endif #endif
#if 1 #if 0
CaseCtrl gCaseCtrl = { // default CaseCtrl gCaseCtrl = { // default
.precision = TIME_PRECISION_MILLI, .precision = TIME_PRECISION_MILLI,
.bindNullNum = 0, .bindNullNum = 0,
...@@ -2749,6 +2749,7 @@ void runAll(TAOS *taos) { ...@@ -2749,6 +2749,7 @@ void runAll(TAOS *taos) {
printf("%s Begin\n", gCaseCtrl.caseCatalog); printf("%s Begin\n", gCaseCtrl.caseCatalog);
runCaseList(taos); runCaseList(taos);
#if 0
strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test"); strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog); printf("%s Begin\n", gCaseCtrl.caseCatalog);
gCaseCtrl.precision = TIME_PRECISION_MICRO; gCaseCtrl.precision = TIME_PRECISION_MICRO;
...@@ -2804,6 +2805,7 @@ void runAll(TAOS *taos) { ...@@ -2804,6 +2805,7 @@ void runAll(TAOS *taos) {
gCaseCtrl.bindColNum = 6; gCaseCtrl.bindColNum = 6;
runCaseList(taos); runCaseList(taos);
gCaseCtrl.bindColNum = 0; gCaseCtrl.bindColNum = 0;
#endif
/* /*
strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test"); strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册