提交 a15a1ab8 编写于 作者: H Hongze Cheng

Merge branches 'enh/tsdb_optimize' and '3.0' of...

Merge branches 'enh/tsdb_optimize' and '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize
......@@ -44,7 +44,7 @@ For more details on features, please read through the entire documentation.
## Competitive Advantages
By making full use of [characteristics of time series data](https://tdengine.com/tsdb/characteristics-of-time-series-data/), TDengine differentiates itself from other [time series databases](https://tdengine.com/tsdb), with the following advantages.
By making full use of [characteristics of time series data](https://tdengine.com/tsdb/characteristics-of-time-series-data/), TDengine differentiates itself from other [time series databases](https://tdengine.com/tsdb/), with the following advantages.
- **[High-Performance](https://tdengine.com/tdengine/high-performance-time-series-database/)**: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression.
......@@ -123,13 +123,12 @@ As a high-performance, scalable and SQL supported time-series database, TDengine
## Comparison with other databases
- [Writing Performance Comparison of TDengine and InfluxDB ](https://tdengine.com/performance-comparison-of-tdengine-and-influxdb/)
- [Query Performance Comparison of TDengine and InfluxDB](https://tdengine.com/query-performance-comparison-test-report-tdengine-vs-influxdb/)
- [TDengine vs OpenTSDB](https://tdengine.com/performance-tdengine-vs-opentsdb/)
- [TDengine vs Cassandra](https://tdengine.com/performance-tdengine-vs-cassandra/)
- [TDengine vs InfluxDB](https://tdengine.com/performance-tdengine-vs-influxdb/)
- [TDengine vs. InfluxDB](https://tdengine.com/tsdb-comparison-influxdb-vs-tdengine/)
- [TDengine vs. TimescaleDB](https://tdengine.com/tsdb-comparison-timescaledb-vs-tdengine/)
- [TDengine vs. OpenTSDB](https://tdengine.com/performance-tdengine-vs-opentsdb/)
- [TDengine vs. Cassandra](https://tdengine.com/performance-tdengine-vs-cassandra/)
## More readings
- [Introduction to Time-Series Database](https://tdengine.com/tsdb/)
- [Introduction to TDengine competitive advantages](https://tdengine.com/tdengine/)
......@@ -6,7 +6,7 @@ description: This document describes how to install TDengine in a Docker contain
This document describes how to install TDengine in a Docker container and perform queries and inserts.
- The easiest way to explore TDengine is through [TDengine Cloud](http://cloud.tdengine.com).
- The easiest way to explore TDengine is through [TDengine Cloud](https://cloud.tdengine.com).
- To get started with TDengine in a non-containerized environment, see [Quick Install from Package](../../get-started/package).
- If you want to view the source code, build TDengine yourself, or contribute to the project, see the [TDengine GitHub repository](https://github.com/taosdata/TDengine).
......
......@@ -10,7 +10,7 @@ import PkgListV3 from "/components/PkgListV3";
This document describes how to install TDengine on Linux/Windows/macOS and perform queries and inserts.
- The easiest way to explore TDengine is through [TDengine Cloud](http://cloud.tdengine.com).
- The easiest way to explore TDengine is through [TDengine Cloud](https://cloud.tdengine.com).
- To get started with TDengine on Docker, see [Quick Install on Docker](../../get-started/docker).
- If you want to view the source code, build TDengine yourself, or contribute to the project, see the [TDengine GitHub repository](https://github.com/taosdata/TDengine).
......
......@@ -288,6 +288,6 @@ Prior to establishing connection, please make sure TDengine is already running a
</Tabs>
:::tip
If the connection fails, in most cases it's caused by improper configuration for FQDN or firewall. Please refer to the section "Unable to establish connection" in [FAQ](https://docs.tdengine.com/train-faq/faq).
If the connection fails, in most cases it's caused by improper configuration for FQDN or firewall. Please refer to the section "Unable to establish connection" in [FAQ](../../train-faq/faq).
:::
......@@ -23,7 +23,7 @@ By subscribing to a topic, a consumer can obtain the latest data in that topic i
To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.
Tips:The default data subscription is to consume data from the wal. If the wal is deleted, the consumed data will be incomplete. At this time, you can set the parameter experimental.snapshot.enable to true to obtain all data from the tsdb, but in this way, the consumption order of the data cannot be guaranteed. Therefore, it is recommended to set a reasonable retention policy for WAL based on your consumption situation to ensure that you can subscribe all data from WAL.
Tips: Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter `WAL_RETENTION_PERIOD` or `WAL_RETENTION_SIZE` when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products.
## Data Schema and API
......@@ -294,7 +294,6 @@ You configure the following parameters when creating a consumer:
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `experimental.snapshot.enable` | boolean | Specify whether to consume data in TSDB; true: both data in WAL and in TSDB can be consumed; false: only data in WAL can be consumed | default value: false |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false
The method of specifying these parameters depends on the language used:
......@@ -312,7 +311,6 @@ tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -368,7 +366,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
consumer, err := NewConsumer(conf)
......@@ -416,7 +413,6 @@ Python programs use the following parameters:
| `enable.auto.commit` | string | Commit automatically | pecify `true` or `false` |
| `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | |
| `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `experimental.snapshot.enable` | string | Specify whether it's allowed to consume messages from the WAL or from TSDB | Specify `true` or `false` |
| `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` |
</TabItem>
......
......@@ -55,7 +55,7 @@ window_clause: {
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
interp_clause:
RANGE(ts_val, ts_val), EVERY(every_val), FILL(fill_mod_and_val)
RANGE(ts_val, ts_val) EVERY(every_val) FILL(fill_mod_and_val)
partition_by_clause:
PARTITION BY expr [, expr] ...
......
......@@ -886,7 +886,7 @@ INTERP(expr)
- The output time range of `INTERP` is specified by `RANGE(timestamp1,timestamp2)` parameter, with timestamp1 <= timestamp2. timestamp1 is the starting point of the output time range and must be specified. timestamp2 is the ending point of the output time range and must be specified.
- The number of rows in the result set of `INTERP` is determined by the parameter `EVERY(time_unit)`. Starting from timestamp1, one interpolation is performed for every time interval specified `time_unit` parameter. The parameter `time_unit` must be an integer, with no quotes, with a time unit of: a(millisecond)), s(second), m(minute), h(hour), d(day), or w(week). For example, `EVERY(500a)` will interpolate every 500 milliseconds.
- Interpolation is performed based on `FILL` parameter. For more information about FILL clause, see [FILL Clause](../distinguished/#fill-clause).
- `INTERP` can only be used to interpolate in single timeline. So it must be used with `partition by tbname` when it's used on a STable.
- `INTERP` can be applied to supertable by interpolating primary key sorted data of all its childtables. It can also be used with `partition by tbname` when applied to supertable to generate interpolation on each single timeline.
- Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.2.0).
- Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.3.0).
......
......@@ -129,6 +129,14 @@ SHOW QNODES;
Shows information about qnodes in the system.
## SHOW QUERIES
```sql
SHOW QUERIES;
```
Shows the queries in progress in the system.
## SHOW SCORES
```sql
......
......@@ -440,6 +440,7 @@ TDengine has significantly improved the bind APIs to support data writing (INSER
- JDBC REST connections do not currently support bind interface
- The following sample code is based on taos-jdbcdriver-3.2.1
- The setString method should be called for binary type data, and the setNString method should be called for nchar type data
- Do not use `db.?` in prepareStatement when specify the database with the table name, should directly use `?`, then specify the database in setTableName, for example: `prepareStatement.setTableName("db.t1")`.
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
......
......@@ -12,8 +12,8 @@ After TDengine starts, it automatically writes many metrics in specific interval
To deploy TDinsight, we need
- a single-node TDengine server or a multi-node TDengine cluster and a [Grafana] server are required. This dashboard requires TDengine 3.0.1.0 and above, with the monitoring feature enabled. For detailed configuration, please refer to [TDengine monitoring configuration](../config/#monitoring-parameters).
- taosAdapter has been instaleld and running, please refer to [taosAdapter](../taosadapter).
- taosKeeper has been installed and running, please refer to [taosKeeper](../taoskeeper).
- taosAdapter has been installed and running, please refer to [taosAdapter](../taosadapter).
- taosKeeper has been installed and running, please refer to [taosKeeper](../taosKeeper).
Please record
- The endpoint of taosAdapter REST service, for example `http://tdengine.local:6041`
......
......@@ -35,7 +35,7 @@ Please refer to the [official documentation](https://grafana.com/grafana/downloa
### TDengine
Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/all-downloads/) page on the TAOSData website and install it.
Download and install the [latest version of TDengine](https://docs.tdengine.com/releases/tdengine/).
## Data Connection Setup
......
......@@ -38,7 +38,7 @@ Please refer to the [official documentation](https://grafana.com/grafana/downloa
### Install TDengine
Download the latest TDengine-server from the [Downloads](http://tdengine.com/en/all-downloads/) page on the TAOSData website and install it.
Download and install the [latest version of TDengine](https://docs.tdengine.com/releases/tdengine/).
## Data Connection Setup
......
......@@ -32,7 +32,7 @@ TDengine 3.0 is not compatible with the configuration and data files from previo
2. Run `sudo rm -rf /var/log/taos/` to delete your log files.
3. Run `sudo rm -rf /var/lib/taos/` to delete your data files.
4. Install TDengine 3.0.
5. For assistance in migrating data to TDengine 3.0, contact [TDengine Support](https://tdengine.com/support).
5. For assistance in migrating data to TDengine 3.0, contact [TDengine Support](https://tdengine.com/support/).
### 2. How can I resolve the "Unable to establish connection" error?
......
......@@ -25,7 +25,8 @@ import CDemo from "./_sub_c.mdx";
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
注意:默认是从wal消费数据,如果wal被删除,消费到的数据会不全,此时可以将参数 experimental.snapshot.enable 设置为true,从tsdb获取全部数据,但是这样的话就不能保证数据的消费顺序。所以建议根据自己的消费情况合理的设置wal的保留策略,保证可以从wal里订阅到全部数据。
注意:数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似。
## 主要数据结构和 API
不同语言下, TMQ 订阅相关的 API 及数据结构如下:
......@@ -293,7 +294,6 @@ CREATE TOPIC topic_name AS DATABASE db_name;
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default;从头开始订阅; <br/>`latest`: 仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据。当其关闭时,只能消费依据 WAL 保留策略仍然在WAL中的数据;当其打开时,除WAL中的数据以外,也能够消费已经从WAL中删除但落盘到TSDB中的数据 | 实验功能,默认关闭 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) |默认关闭 |
对于不同编程语言,其设置方式如下:
......@@ -311,7 +311,6 @@ tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -367,7 +366,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
consumer, err := NewConsumer(conf)
......@@ -417,7 +415,6 @@ consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
| `enable.auto.commit` | string | 启用自动提交 | 合法值:`true`, `false` |
| `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms |
| `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
</TabItem>
......
......@@ -54,7 +54,7 @@ REST 连接支持所有能运行 Java 的平台。
**注**:REST 连接中增加 `batchfetch` 参数并设置为 true,将开启 WebSocket 连接。
### 处理异常
## 处理异常
在报错后,通过 SQLException 可以获取到错误的信息和错误码:
......@@ -443,6 +443,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据
- JDBC REST 连接目前不支持参数绑定
- 以下示例代码基于 taos-jdbcdriver-3.2.1
- binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法
- 预处理语句中指定数据库与子表名称不要使用 `db.?`,应直接使用 `?`,然后在 setTableName 中指定数据库,如:`prepareStatement.setTableName("db.t1")`。
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
......@@ -864,7 +865,7 @@ public class ParameterBindingDemo {
用于设定 TAGS 取值的方法总共有:
````java
```java
public void setTagNull(int index, int type)
public void setTagBoolean(int index, boolean value)
public void setTagInt(int index, int value)
......
......@@ -55,7 +55,7 @@ window_clause: {
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
interp_clause:
RANGE(ts_val, ts_val), EVERY(every_val), FILL(fill_mod_and_val)
RANGE(ts_val, ts_val) EVERY(every_val) FILL(fill_mod_and_val)
partition_by_clause:
PARTITION BY expr [, expr] ...
......
......@@ -888,7 +888,7 @@ INTERP(expr)
- INTERP 的输出时间范围根据 RANGE(timestamp1,timestamp2)字段来指定,需满足 timestamp1 <= timestamp2。其中 timestamp1(必选值)为输出时间范围的起始值,即如果 timestamp1 时刻符合插值条件则 timestamp1 为输出的第一条记录,timestamp2(必选值)为输出时间范围的结束值,即输出的最后一条记录的 timestamp 不能大于 timestamp2。
- INTERP 根据 EVERY(time_unit) 字段来确定输出时间范围内的结果条数,即从 timestamp1 开始每隔固定长度的时间(time_unit 值)进行插值,time_unit 可取值时间单位:1a(毫秒),1s(秒),1m(分),1h(小时),1d(天),1w(周)。例如 EVERY(500a) 将对于指定数据每500毫秒间隔进行一次插值.
- INTERP 根据 FILL 字段来决定在每个符合输出条件的时刻如何进行插值。关于 FILL 子句如何使用请参考 [FILL 子句](../distinguished/#fill-子句)
- INTERP 只能在一个时间序列内进行插值,因此当作用于超级表时必须跟 partition by tbname 一起使用
- INTERP 作用于超级表时, 会将该超级表下的所有子表数据按照主键列排序后进行插值计算,也可以搭配 PARTITION BY tbname 使用,将结果强制规约到单个时间线
- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0版本以后支持)。
- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0版本以后支持)。
......
......@@ -129,6 +129,14 @@ SHOW QNODES;
显示当前系统中 QNODE (查询节点)的信息。
## SHOW QUERIES
```sql
SHOW QUERIES;
```
显示当前系统中正在进行的查询。
## SHOW SCORES
```sql
......
......@@ -735,7 +735,6 @@ charset 的有效值是 UTF-8。
| 16 | maxTmrCtrl | 是 | 否 | 3.0 行为未知 |
| 17 | monitorReplica | 是 | 否 | 由 RAFT 协议管理多副本 |
| 18 | smlTagNullName | 是 | 否 | 3.0 行为未知 |
| 19 | keepColumnName | 是 | 否 | 3.0 行为未知 |
| 20 | ratioOfQueryCores | 是 | 否 | 由 线程池 相关配置参数决定 |
| 21 | maxStreamCompDelay | 是 | 否 | 3.0 行为未知 |
| 22 | maxFirstStreamCompDelay | 是 | 否 | 3.0 行为未知 |
......
......@@ -26,6 +26,10 @@ extern "C" {
#include "tgrantCfg.h"
#endif
#ifndef GRANTS_COL_MAX_LEN
#define GRANTS_COL_MAX_LEN 196
#endif
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,
......@@ -47,23 +51,49 @@ typedef enum {
int32_t grantCheck(EGrantType grant);
#ifndef GRANTS_CFG
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
#ifdef TD_ENTERPRISE
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "opc_da", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "opc_ua", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "pi", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "kafka", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "influxdb", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "mqtt", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
}
#else
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
}
#endif
#define GRANT_CFG_ADD
#define GRANT_CFG_SET
#define GRANT_CFG_GET
......
......@@ -1232,6 +1232,14 @@ typedef struct {
SEp ep;
} SDnodeEp;
typedef struct {
int32_t id;
int8_t isMnode;
SEp ep;
char active[TSDB_ACTIVE_KEY_LEN];
char connActive[TSDB_CONN_ACTIVE_KEY_LEN];
} SDnodeInfo;
typedef struct {
int64_t dnodeVer;
SDnodeCfg dnodeCfg;
......@@ -1625,6 +1633,21 @@ typedef struct {
int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
int32_t tDeserializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
enum {
RESTORE_TYPE__ALL = 1,
RESTORE_TYPE__MNODE,
RESTORE_TYPE__VNODE,
RESTORE_TYPE__QNODE,
};
typedef struct {
int32_t dnodeId;
int8_t restoreType;
} SRestoreDnodeReq;
int32_t tSerializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq);
int32_t tDeserializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq);
typedef struct {
int32_t dnodeId;
char config[TSDB_DNODE_CONFIG_LEN];
......
......@@ -178,6 +178,7 @@ enum {
// TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
......
此差异已折叠。
......@@ -350,6 +350,11 @@ typedef struct SDropComponentNodeStmt {
int32_t dnodeId;
} SDropComponentNodeStmt;
typedef struct SRestoreComponentNodeStmt {
ENodeType type;
int32_t dnodeId;
} SRestoreComponentNodeStmt;
typedef struct SCreateTopicStmt {
ENodeType type;
char topicName[TSDB_TABLE_NAME_LEN];
......
......@@ -211,6 +211,10 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
QUERY_NODE_RESTORE_DNODE_STMT,
QUERY_NODE_RESTORE_QNODE_STMT,
QUERY_NODE_RESTORE_MNODE_STMT,
QUERY_NODE_RESTORE_VNODE_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN = 1000,
......
......@@ -133,6 +133,16 @@ int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId);
*/
int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief check directories exist in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @param diskId The disk ID.
* @return true for exist, false for not exist.
*/
bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief Remove directory at all levels in tfs.
*
......
......@@ -406,6 +406,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411)
#define TSDB_CODE_MNODE_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0412) // internal
#define TSDB_CODE_MNODE_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0413) // internal
#define TSDB_CODE_MNODE_ONLY_TWO_MNODE TAOS_DEF_ERROR_CODE(0, 0x0414) // internal
// vnode
// #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x
......@@ -442,6 +443,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531)
#define TSDB_CODE_VND_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0532) // internal
#define TSDB_CODE_VND_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0533) // internal
#define TSDB_CODE_VND_DIR_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0534)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
......@@ -267,6 +267,9 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_CONFIG_LEN 128
#define TSDB_DNODE_VALUE_LEN 256
#define TSDB_ACTIVE_KEY_LEN 109 // history 109:?
#define TSDB_CONN_ACTIVE_KEY_LEN 257 // history 257:?
#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
......
......@@ -2517,6 +2517,31 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
*numOfAssignment = num;
}
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
tmq_topic_assignment* p = &(*assignment)[j];
for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId != p->vgId) {
continue;
}
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
char offsetBuf[80] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end;
pOffsetInfo->currentOffset.version = p->currentOffset;
pOffsetInfo->committedOffset.version = p->currentOffset;
}
}
destroyCommonInfo(pCommon);
return code;
} else {
......@@ -2564,7 +2589,8 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
}
if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) {
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64, tmq->consumerId, offset);
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
return TSDB_CODE_INVALID_PARA;
}
......
......@@ -1122,6 +1122,8 @@ TEST(clientCase, sub_tb_test) {
return;
}
tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, 0);
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes != NULL) {
......
......@@ -35,6 +35,10 @@ static const SSysDbTableSchema dnodesSchema[] = {
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
#ifdef TD_ENTERPRISE
{.name = "active_code", .bytes = TSDB_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "c_active_code", .bytes = TSDB_CONN_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
#endif
};
static const SSysDbTableSchema mnodesSchema[] = {
......
......@@ -1720,6 +1720,33 @@ int32_t tDeserializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq
return 0;
}
int32_t tSerializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeI8(&encoder, pReq->restoreType) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->restoreType) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSMCfgDnodeReq(void *buf, int32_t bufLen, SMCfgDnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......
......@@ -174,6 +174,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESTORE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
......
......@@ -255,7 +255,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode != NULL) {
dInfo("vgId:%d, already exist", req.vgId);
dError("vgId:%d, already exist", req.vgId);
tFreeSCreateVnodeReq(&req);
vmReleaseVnode(pMgmt, pVnode);
terrno = TSDB_CODE_VND_ALREADY_EXIST;
......@@ -264,7 +264,22 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
if (pMgmt->pTfs) {
if (tfsDirExistAt(pMgmt->pTfs, path, (SDiskID){0})) {
terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST;
dError("vgId:%d, failed to restore vnode since %s", req.vgId, terrstr());
return -1;
}
} else {
if (taosDirExist(path)) {
terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST;
dError("vgId:%d, failed to restore vnode since %s", req.vgId, terrstr());
return -1;
}
}
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
tFreeSCreateVnodeReq(&req);
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
code = terrno;
......@@ -344,6 +359,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
ESyncRole role = vnodeGetRole(pVnode->pImpl);
dInfo("vgId:%d, checking node role:%d", req.vgId, role);
if(role == TAOS_SYNC_ROLE_VOTER){
dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
vmReleaseVnode(pMgmt, pVnode);
return -1;
......
......@@ -218,6 +218,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
dInfo("node:%s, checking node role:%d", pWrapper->name, role);
if(role == TAOS_SYNC_ROLE_VOTER){
dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
return -1;
}
......
......@@ -6,6 +6,7 @@ IF (TD_ENTERPRISE)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/privilege/src/privilege.c)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDb.c)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndVgroup.c)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDnode.c)
ENDIF ()
add_library(mnode STATIC ${MNODE_SRC})
......
......@@ -206,6 +206,8 @@ typedef struct {
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
char ep[TSDB_EP_LEN];
char active[TSDB_ACTIVE_KEY_LEN];
char connActive[TSDB_CONN_ACTIVE_KEY_LEN];
} SDnodeObj;
typedef struct {
......
......@@ -29,7 +29,7 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
int32_t mndGetDnodeSize(SMnode *pMnode);
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs);
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps);
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo);
#ifdef __cplusplus
}
......
......@@ -29,6 +29,10 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj);
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet);
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force);
int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj);
int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj);
int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj);
int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj);
#ifdef __cplusplus
}
......
......@@ -30,6 +30,9 @@ SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId);
void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj);
int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit);
int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj, bool force);
bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId);
int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj);
int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj);
#ifdef __cplusplus
}
......
......@@ -49,6 +49,9 @@ int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid);
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId);
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup,
SDnodeObj *pDnode);
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup);
......
......@@ -27,7 +27,7 @@
#include "tmisce.h"
#include "mndCluster.h"
#define TSDB_DNODE_VER_NUMBER 1
#define TSDB_DNODE_VER_NUMBER 2
#define TSDB_DNODE_RESERVE_SIZE 64
static const char *offlineReason[] = {
......@@ -58,6 +58,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
......@@ -83,6 +84,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
......@@ -139,6 +141,10 @@ static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
SDB_SET_INT16(pRaw, dataPos, TSDB_ACTIVE_KEY_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pDnode->active, TSDB_ACTIVE_KEY_LEN, _OVER)
SDB_SET_INT16(pRaw, dataPos, TSDB_CONN_ACTIVE_KEY_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pDnode->connActive, TSDB_CONN_ACTIVE_KEY_LEN, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER);
terrno = 0;
......@@ -161,7 +167,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_DNODE_VER_NUMBER) {
if (sver < 1 || sver > TSDB_DNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
......@@ -179,6 +185,13 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
if (sver > 1) {
int16_t keyLen = 0;
SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pDnode->active, keyLen, _OVER)
SDB_GET_INT16(pRaw, dataPos, &keyLen, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pDnode->connActive, keyLen, _OVER)
}
terrno = 0;
if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
......@@ -294,6 +307,11 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
return sdbGetSize(pSdb, SDB_DNODE);
}
int32_t mndGetDbSize(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
return sdbGetSize(pSdb, SDB_DB);
}
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
int64_t interval = TABS(pDnode->lastAccessTime - curMs);
if (interval > 5000 * (int64_t)tsStatusInterval) {
......@@ -305,7 +323,7 @@ bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
return true;
}
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
static void mndGetDnodeEps(SMnode *pMnode, SArray *pDnodeEps) {
SSdb *pSdb = pMnode->pSdb;
int32_t numOfEps = 0;
......@@ -330,6 +348,34 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
}
}
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
SSdb *pSdb = pMnode->pSdb;
int32_t numOfEps = 0;
void *pIter = NULL;
while (1) {
SDnodeObj *pDnode = NULL;
ESdbStatus objStatus = 0;
pIter = sdbFetchAll(pSdb, SDB_DNODE, pIter, (void **)&pDnode, &objStatus, true);
if (pIter == NULL) break;
SDnodeInfo dInfo;
dInfo.id = pDnode->id;
dInfo.ep.port = pDnode->port;
tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
tstrncpy(dInfo.active, pDnode->active, TSDB_ACTIVE_KEY_LEN);
tstrncpy(dInfo.connActive, pDnode->connActive, TSDB_CONN_ACTIVE_KEY_LEN);
sdbRelease(pSdb, pDnode);
if (mndIsMnode(pMnode, pDnode->id)) {
dInfo.isMnode = 1;
} else {
dInfo.isMnode = 0;
}
taosArrayPush(pDnodeInfo, &dInfo);
}
}
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
if (pCfg->statusInterval != tsStatusInterval) {
mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
......@@ -536,7 +582,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
goto _OVER;
}
mndGetDnodeData(pMnode, statusRsp.pDnodeEps);
mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
void *pHead = rpcMallocCont(contLen);
......@@ -745,6 +791,18 @@ _OVER:
return code;
}
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq){
return mndProcessRestoreDnodeReqImpl(pReq);
}
#ifndef TD_ENTERPRISE
int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq){
return 0;
}
#endif
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
SSnodeObj *pSObj, int32_t numOfVnodes, bool force) {
int32_t code = -1;
......@@ -1041,6 +1099,7 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
ESdbStatus objStatus = 0;
SDnodeObj *pDnode = NULL;
int64_t curMs = taosGetTimestampMs();
char buf[TSDB_CONN_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE]; // make sure TSDB_CONN_ACTIVE_KEY_LEN >= TSDB_EP_LEN
while (numOfRows < rows) {
pShow->pIter = sdbFetchAll(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode, &objStatus, true);
......@@ -1052,7 +1111,6 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->id, false);
char buf[tListLen(pDnode->ep) + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......@@ -1077,10 +1135,9 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
status = "offline";
}
char b1[16] = {0};
STR_TO_VARSTR(b1, status);
STR_TO_VARSTR(buf, status);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, b1, false);
colDataSetVal(pColInfo, numOfRows, buf, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false);
......@@ -1095,6 +1152,16 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataSetVal(pColInfo, numOfRows, b, false);
taosMemoryFreeClear(b);
#ifdef TD_ENTERPRISE
STR_TO_VARSTR(buf, pDnode->active);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, buf, false);
STR_TO_VARSTR(buf, pDnode->connActive);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, buf, false);
#endif
numOfRows++;
sdbRelease(pSdb, pDnode);
}
......
......@@ -275,6 +275,14 @@ static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeO
return 0;
}
int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
if (pUndoRaw == NULL) return -1;
......@@ -283,7 +291,7 @@ static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeO
return 0;
}
static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
......@@ -421,6 +429,55 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
return 0;
}
int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SDCreateMnodeReq createReq = {0};
SEpSet createEpset = {0};
while (1) {
SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break;
if(pMObj->id == pDnode->id) {
sdbRelease(pSdb, pMObj);
continue;
}
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){
createReq.replicas[createReq.replica].id = pMObj->id;
createReq.replicas[createReq.replica].port = pMObj->pDnode->port;
memcpy(createReq.replicas[createReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
createReq.replica++;
}
else{
createReq.learnerReplicas[createReq.learnerReplica].id = pMObj->id;
createReq.learnerReplicas[createReq.learnerReplica].port = pMObj->pDnode->port;
memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
createReq.learnerReplica++;
}
sdbRelease(pSdb, pMObj);
}
createReq.learnerReplicas[createReq.learnerReplica].id = pDnode->id;
createReq.learnerReplicas[createReq.learnerReplica].port = pDnode->port;
memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
createReq.learnerReplica++;
createReq.lastIndex = pObj->lastIndex;
createEpset.inUse = 0;
createEpset.numOfEps = 1;
createEpset.eps[0].port = pDnode->port;
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
if (mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset) != 0) return -1;
return 0;
}
static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
......@@ -465,6 +522,55 @@ static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, S
return 0;
}
int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SDAlterMnodeTypeReq alterReq = {0};
SEpSet createEpset = {0};
while (1) {
SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break;
if(pMObj->id == pDnode->id) {
sdbRelease(pSdb, pMObj);
continue;
}
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){
alterReq.replicas[alterReq.replica].id = pMObj->id;
alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
alterReq.replica++;
}
else{
alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
alterReq.learnerReplica++;
}
sdbRelease(pSdb, pMObj);
}
alterReq.replicas[alterReq.replica].id = pDnode->id;
alterReq.replicas[alterReq.replica].port = pDnode->port;
memcpy(alterReq.replicas[alterReq.replica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
alterReq.replica++;
alterReq.lastIndex = pObj->lastIndex;
createEpset.inUse = 0;
createEpset.numOfEps = 1;
createEpset.eps[0].port = pDnode->port;
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
if (mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset) != 0) return -1;
return 0;
}
static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
int32_t code = -1;
......
......@@ -180,7 +180,7 @@ static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) {
return 0;
}
static int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
......@@ -188,7 +188,11 @@ static int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
return 0;
}
static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId) {
return pQnode->pDnode->id == dnodeId;
}
int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
SDCreateQnodeReq createReq = {0};
createReq.dnodeId = pDnode->id;
......
......@@ -1155,6 +1155,28 @@ int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg
return 0;
}
int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pDnode) {
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_VNODE;
action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
......@@ -1274,6 +1296,29 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
return 0;
}
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
SDnodeObj *pDnode) {
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
int32_t dnodeId) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
......@@ -2113,6 +2158,55 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
return 0;
}
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup,
SDnodeObj *pDnode) {
SVgObj newVgroup = {0};
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
mInfo("db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);
if(newVgroup.replica == 1){
int selected = 0;
for(int i = 0; i < newVgroup.replica; i++){
newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){
selected = i;
}
}
if (mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]) != 0) return -1;
}
else if(newVgroup.replica == 3){
for(int i = 0; i < newVgroup.replica; i++){
if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){
newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER;
}
else{
newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
}
}
if (mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode) != 0) return -1;
for(int i = 0; i < newVgroup.replica; i++){
newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){
}
}
if (mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode) != 0)
return -1;
}
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
return 0;
}
......@@ -2437,6 +2531,13 @@ _OVER:
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
for(int i = 0; i < pVgroup->replica; i++){
if(pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
}
return false;
}
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,
STimeWindow tw) {
SCompactVnodeReq compactReq = {0};
......
......@@ -301,8 +301,12 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
}
// save the new offset value
tqDebug("vgId:%d sub:%s seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
pSavedOffset->val.version);
if (pSavedOffset != NULL) {
tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
pSavedOffset->val.version);
} else {
tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version);
}
if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
......
......@@ -180,11 +180,12 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
*(SLastCol *)(*value) = *pLastCol;
if (IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = *value + sizeof(*pLastCol);
SColVal *pDColVal = &((SLastCol *)(*value))->colVal;
pDColVal->value.pData = *value + sizeof(*pLastCol);
if (pColVal->value.nData > 0) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
memcpy(pDColVal->value.pData, pVal, pColVal->value.nData);
} else {
pColVal->value.pData = NULL;
pDColVal->value.pData = NULL;
}
}
*size = length;
......@@ -340,6 +341,16 @@ _exit:
return code;
}
static void reallocVarData(SColVal *pColVal) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
}
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
int nCols, int16_t *slotIds);
......@@ -347,9 +358,10 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
int nCols, int16_t *slotIds);
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) {
static char const *alstring[2] = {"last_row", "last"};
char const *lstring = alstring[ltype];
int32_t code = 0;
static char const *alstring[2] = {"last_row", "last"};
char const *lstring = alstring[ltype];
rocksdb_writebatch_t *wb = NULL;
int32_t code = 0;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
......@@ -387,14 +399,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
if (pLastCol) {
SColVal *pColVal = &pLastCol->colVal;
if (IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
reallocVarData(&pLastCol->colVal);
} else {
taosThreadMutexLock(&pTsdb->rCache.rMutex);
......@@ -423,22 +428,28 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
}
// store result back to rocks cache
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
char *value = NULL;
size_t vlen = 0;
wb = pTsdb->rCache.writebatch;
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen);
char key[ROCKS_KEY_LEN];
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring);
rocksdb_writebatch_put(wb, key, klen, value, vlen);
taosMemoryFree(value);
} else {
reallocVarData(&pLastCol->colVal);
}
if (wb) {
char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
......@@ -1381,7 +1392,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
*pIgnoreEarlierTs = false;
tBlockDataReset(state->pBlockData);
TABLEID tid = {.suid = state->suid, .uid = state->uid};
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, aCols, nCols);
int nTmpCols = nCols;
if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID && nCols == 1) {
nTmpCols = 0;
skipBlock = false;
}
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, aCols, nTmpCols);
if (code) goto _err;
code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData);
......
......@@ -257,6 +257,14 @@ const char* nodesNodeName(ENodeType type) {
return "DeleteStmt";
case QUERY_NODE_INSERT_STMT:
return "InsertStmt";
case QUERY_NODE_RESTORE_DNODE_STMT:
return "RestoreDnodeStmt";
case QUERY_NODE_RESTORE_QNODE_STMT:
return "RestoreQnodeStmt";
case QUERY_NODE_RESTORE_MNODE_STMT:
return "RestoreMnodeStmt";
case QUERY_NODE_RESTORE_VNODE_STMT:
return "RestoreVnodeStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -5533,6 +5541,35 @@ static int32_t jsonToDropDnodeStmt(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkRestoreComponentNodeStmtDnodeId = "DnodeId";
static int32_t restoreComponentNodeStmtToJson(const void* pObj, SJson* pJson) {
const SRestoreComponentNodeStmt* pNode = (const SRestoreComponentNodeStmt*)pObj;
return tjsonAddIntegerToObject(pJson, jkRestoreComponentNodeStmtDnodeId, pNode->dnodeId);
}
static int32_t jsonToRestoreComponentNodeStmt(const SJson* pJson, void* pObj) {
SRestoreComponentNodeStmt* pNode = (SRestoreComponentNodeStmt*)pObj;
return tjsonGetIntValue(pJson, jkRestoreComponentNodeStmtDnodeId, &pNode->dnodeId);
}
static int32_t jsonToRestoreDnodeStmt(const SJson* pJson, void* pObj) {
return jsonToRestoreComponentNodeStmt(pJson, pObj);
}
static int32_t jsonToRestoreQnodeStmt(const SJson* pJson, void* pObj) {
return jsonToRestoreComponentNodeStmt(pJson, pObj);
}
static int32_t jsonToRestoreMnodeStmt(const SJson* pJson, void* pObj) {
return jsonToRestoreComponentNodeStmt(pJson, pObj);
}
static int32_t jsonToRestoreVnodeStmt(const SJson* pJson, void* pObj) {
return jsonToRestoreComponentNodeStmt(pJson, pObj);
}
static const char* jkCreateTopicStmtTopicName = "TopicName";
static const char* jkCreateTopicStmtSubscribeDbName = "SubscribeDbName";
static const char* jkCreateTopicStmtIgnoreExists = "IgnoreExists";
......@@ -6820,6 +6857,14 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToDeleteStmt(pJson, pObj);
case QUERY_NODE_INSERT_STMT:
return jsonToInsertStmt(pJson, pObj);
case QUERY_NODE_RESTORE_DNODE_STMT:
return jsonToRestoreDnodeStmt(pJson, pObj);
case QUERY_NODE_RESTORE_QNODE_STMT:
return jsonToRestoreQnodeStmt(pJson, pObj);
case QUERY_NODE_RESTORE_MNODE_STMT:
return jsonToRestoreMnodeStmt(pJson, pObj);
case QUERY_NODE_RESTORE_VNODE_STMT:
return jsonToRestoreVnodeStmt(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return jsonToLogicScanNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_JOIN:
......
......@@ -455,6 +455,11 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SInsertStmt));
case QUERY_NODE_QUERY:
return makeNode(type, sizeof(SQuery));
case QUERY_NODE_RESTORE_DNODE_STMT:
case QUERY_NODE_RESTORE_QNODE_STMT:
case QUERY_NODE_RESTORE_MNODE_STMT:
case QUERY_NODE_RESTORE_VNODE_STMT:
return makeNode(type, sizeof(SRestoreComponentNodeStmt));
case QUERY_NODE_LOGIC_PLAN_SCAN:
return makeNode(type, sizeof(SScanLogicNode));
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -1046,6 +1051,11 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pQuery->pPrepareRoot);
break;
}
case QUERY_NODE_RESTORE_DNODE_STMT: // no pointer field
case QUERY_NODE_RESTORE_QNODE_STMT: // no pointer field
case QUERY_NODE_RESTORE_MNODE_STMT: // no pointer field
case QUERY_NODE_RESTORE_VNODE_STMT: // no pointer field
break;
case QUERY_NODE_LOGIC_PLAN_SCAN: {
SScanLogicNode* pLogicNode = (SScanLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);
......
......@@ -202,6 +202,7 @@ SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInt
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pIndexName);
SNode* createCreateComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createRestoreComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pQuery);
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName,
bool withMeta);
......
......@@ -123,7 +123,7 @@ priv_level(A) ::= topic_name(B).
with_opt(A) ::= . { A = NULL; }
with_opt(A) ::= WITH search_condition(B). { A = B; }
/************************************************ create/drop/alter dnode *********************************************/
/************************************************ create/drop/alter/restore dnode *********************************************/
cmd ::= CREATE DNODE dnode_endpoint(A). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, NULL); }
cmd ::= CREATE DNODE dnode_endpoint(A) PORT NK_INTEGER(B). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, &B); }
cmd ::= DROP DNODE NK_INTEGER(A) force_opt(B). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A, B); }
......@@ -132,6 +132,7 @@ cmd ::= ALTER DNODE NK_INTEGER(A) NK_STRING(B).
cmd ::= ALTER DNODE NK_INTEGER(A) NK_STRING(B) NK_STRING(C). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, &A, &B, &C); }
cmd ::= ALTER ALL DNODES NK_STRING(A). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, NULL, &A, NULL); }
cmd ::= ALTER ALL DNODES NK_STRING(A) NK_STRING(B). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, NULL, &A, &B); }
cmd ::= RESTORE DNODE NK_INTEGER(A). { pCxt->pRootNode = createRestoreComponentNodeStmt(pCxt, QUERY_NODE_RESTORE_DNODE_STMT, &A); }
%type dnode_endpoint { SToken }
%destructor dnode_endpoint { }
......@@ -148,9 +149,10 @@ force_opt(A) ::= FORCE.
cmd ::= ALTER LOCAL NK_STRING(A). { pCxt->pRootNode = createAlterLocalStmt(pCxt, &A, NULL); }
cmd ::= ALTER LOCAL NK_STRING(A) NK_STRING(B). { pCxt->pRootNode = createAlterLocalStmt(pCxt, &A, &B); }
/************************************************ create/drop qnode ***************************************************/
/************************************************ create/drop/restore qnode ***************************************************/
cmd ::= CREATE QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateComponentNodeStmt(pCxt, QUERY_NODE_CREATE_QNODE_STMT, &A); }
cmd ::= DROP QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropComponentNodeStmt(pCxt, QUERY_NODE_DROP_QNODE_STMT, &A); }
cmd ::= RESTORE QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createRestoreComponentNodeStmt(pCxt, QUERY_NODE_RESTORE_QNODE_STMT, &A); }
/************************************************ create/drop bnode ***************************************************/
cmd ::= CREATE BNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateComponentNodeStmt(pCxt, QUERY_NODE_CREATE_BNODE_STMT, &A); }
......@@ -160,9 +162,13 @@ cmd ::= DROP BNODE ON DNODE NK_INTEGER(A).
cmd ::= CREATE SNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateComponentNodeStmt(pCxt, QUERY_NODE_CREATE_SNODE_STMT, &A); }
cmd ::= DROP SNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropComponentNodeStmt(pCxt, QUERY_NODE_DROP_SNODE_STMT, &A); }
/************************************************ create/drop mnode ***************************************************/
/************************************************ create/drop/restore mnode ***************************************************/
cmd ::= CREATE MNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateComponentNodeStmt(pCxt, QUERY_NODE_CREATE_MNODE_STMT, &A); }
cmd ::= DROP MNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropComponentNodeStmt(pCxt, QUERY_NODE_DROP_MNODE_STMT, &A); }
cmd ::= RESTORE MNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createRestoreComponentNodeStmt(pCxt, QUERY_NODE_RESTORE_MNODE_STMT, &A); }
/************************************************ restore vnode ***************************************************/
cmd ::= RESTORE VNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createRestoreComponentNodeStmt(pCxt, QUERY_NODE_RESTORE_VNODE_STMT, &A); }
/************************************************ create/drop/use database ********************************************/
cmd ::= CREATE DATABASE not_exists_opt(A) db_name(B) db_options(C). { pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C); }
......
......@@ -1658,6 +1658,14 @@ SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, cons
return (SNode*)pStmt;
}
SNode* createRestoreComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId) {
CHECK_PARSER_STATUS(pCxt);
SRestoreComponentNodeStmt* pStmt = (SRestoreComponentNodeStmt*)nodesMakeNode(type);
CHECK_OUT_OF_MEM(pStmt);
pStmt->dnodeId = taosStr2Int32(pDnodeId->z, NULL, 10);
return (SNode*)pStmt;
}
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pQuery) {
CHECK_PARSER_STATUS(pCxt);
if (!checkTopicName(pCxt, pTopicName)) {
......
......@@ -181,6 +181,7 @@ static SKeyword keywordTable[] = {
{"REPLACE", TK_REPLACE},
{"REPLICA", TK_REPLICA},
{"RESET", TK_RESET},
{"RESTORE", TK_RESTORE},
{"RETENTIONS", TK_RETENTIONS},
{"REVOKE", TK_REVOKE},
{"ROLLUP", TK_ROLLUP},
......@@ -252,6 +253,7 @@ static SKeyword keywordTable[] = {
{"VERBOSE", TK_VERBOSE},
{"VGROUP", TK_VGROUP},
{"VGROUPS", TK_VGROUPS},
{"VNODE", TK_VNODE},
{"VNODES", TK_VNODES},
{"WAL_FSYNC_PERIOD", TK_WAL_FSYNC_PERIOD},
{"WAL_LEVEL", TK_WAL_LEVEL},
......
......@@ -1518,9 +1518,7 @@ static int32_t translateInterpFunc(STranslateContext* pCxt, SFunctionNode* pFunc
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SNode* pTable = pSelect->pFromTable;
if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
(TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType)))) {
if ((NULL != pTable && QUERY_NODE_REAL_TABLE != nodeType(pTable))) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName);
}
......@@ -5431,6 +5429,29 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
return buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
}
static int32_t translateRestoreDnode(STranslateContext* pCxt, SRestoreComponentNodeStmt* pStmt) {
SRestoreDnodeReq restoreReq = {0};
restoreReq.dnodeId = pStmt->dnodeId;
switch (nodeType((SNode*)pStmt)) {
case QUERY_NODE_RESTORE_DNODE_STMT:
restoreReq.restoreType = RESTORE_TYPE__ALL;
break;
case QUERY_NODE_RESTORE_QNODE_STMT:
restoreReq.restoreType = RESTORE_TYPE__QNODE;
break;
case QUERY_NODE_RESTORE_MNODE_STMT:
restoreReq.restoreType = RESTORE_TYPE__MNODE;
break;
case QUERY_NODE_RESTORE_VNODE_STMT:
restoreReq.restoreType = RESTORE_TYPE__VNODE;
break;
default:
return -1;
}
return buildCmdMsg(pCxt, TDMT_MND_RESTORE_DNODE, (FSerializeFunc)tSerializeSRestoreDnodeReq, &restoreReq);
}
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
int32_t* pVgId) {
SVgroupInfo vg = {0};
......@@ -6916,6 +6937,12 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
code = translateShowCreateTable(pCxt, (SShowCreateTableStmt*)pNode);
break;
case QUERY_NODE_RESTORE_DNODE_STMT:
case QUERY_NODE_RESTORE_QNODE_STMT:
case QUERY_NODE_RESTORE_MNODE_STMT:
case QUERY_NODE_RESTORE_VNODE_STMT:
code = translateRestoreDnode(pCxt, (SRestoreComponentNodeStmt*)pNode);
break;
default:
break;
}
......
此差异已折叠。
......@@ -875,4 +875,4 @@ TEST_F(ParserInitialATest, balanceVgroupLeader) {
run("BALANCE VGROUP LEADER");
}
} // namespace ParserTest
\ No newline at end of file
} // namespace ParserTest
......@@ -133,6 +133,63 @@ TEST_F(ParserExplainToSyncdbTest, redistributeVgroup) {
run("REDISTRIBUTE VGROUP 5 DNODE 10 DNODE 20 DNODE 30");
}
TEST_F(ParserExplainToSyncdbTest, restoreDnode) {
useDb("root", "test");
SRestoreDnodeReq expect = {0};
auto clearRestoreDnodeReq = [&]() { memset(&expect, 0, sizeof(SRestoreDnodeReq)); };
auto setRestoreDnodeReq = [&](int32_t dnodeId, int8_t type) {
expect.dnodeId = dnodeId;
expect.restoreType = type;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
int32_t expectNodeType = 0;
switch (expect.restoreType) {
case RESTORE_TYPE__ALL:
expectNodeType = QUERY_NODE_RESTORE_DNODE_STMT;
break;
case RESTORE_TYPE__MNODE:
expectNodeType = QUERY_NODE_RESTORE_MNODE_STMT;
break;
case RESTORE_TYPE__VNODE:
expectNodeType = QUERY_NODE_RESTORE_VNODE_STMT;
break;
case RESTORE_TYPE__QNODE:
expectNodeType = QUERY_NODE_RESTORE_QNODE_STMT;
break;
default:
break;
}
ASSERT_EQ(nodeType(pQuery->pRoot), expectNodeType);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_RESTORE_DNODE);
SRestoreDnodeReq req = {0};
ASSERT_EQ(tDeserializeSRestoreDnodeReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(req.dnodeId, expect.dnodeId);
ASSERT_EQ(req.restoreType, expect.restoreType);
});
setRestoreDnodeReq(1, RESTORE_TYPE__ALL);
run("RESTORE DNODE 1");
clearRestoreDnodeReq();
setRestoreDnodeReq(2, RESTORE_TYPE__MNODE);
run("RESTORE MNODE ON DNODE 2");
clearRestoreDnodeReq();
setRestoreDnodeReq(1, RESTORE_TYPE__VNODE);
run("RESTORE VNODE ON DNODE 1");
clearRestoreDnodeReq();
setRestoreDnodeReq(2, RESTORE_TYPE__QNODE);
run("RESTORE QNODE ON DNODE 2");
clearRestoreDnodeReq();
}
// todo reset query cache
TEST_F(ParserExplainToSyncdbTest, revoke) {
......
......@@ -283,6 +283,14 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) {
return 0;
}
bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
char aname[TMPNAME_LEN];
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
return taosDirExist(aname);
}
int32_t tfsRmdir(STfs *pTfs, const char *rname) {
if (rname[0] == 0) {
return 0;
......
......@@ -320,6 +320,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QNODE_NOT_DEPLOYED, "Qnode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_NOT_FOUND, "Snode not found")
TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_ALREADY_DEPLOYED, "Snode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_NOT_DEPLOYED, "Snode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_NOT_CATCH_UP, "Mnode didn't catch the leader")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_ALREADY_IS_VOTER, "Mnode already is a leader")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_ONLY_TWO_MNODE, "Only two mnodes exist")
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")
......@@ -335,6 +338,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer po
TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_DUP_REQUEST, "Duplicate write request")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_QUERY_BUSY, "Query busy")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_CATCH_UP, "Vnode didn't catch up its leader")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_IS_VOTER, "Vnode already is a voter")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_DIR_ALREADY_EXIST, "Vnode directory already exist")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
......@@ -2,7 +2,7 @@ FROM python:3.8
RUN pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
RUN pip3 install pandas psutil fabric2 requests faker simplejson toml pexpect tzlocal distro
RUN apt-get update
RUN apt-get install -y psmisc sudo tree libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config build-essential valgrind \
RUN apt-get install -y psmisc sudo tree libgeos-dev libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config build-essential valgrind \
vim libjemalloc-dev openssh-server screen sshpass net-tools dirmngr gnupg apt-transport-https ca-certificates software-properties-common r-base iputils-ping
RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9
RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/'
......@@ -45,4 +45,4 @@ RUN pip3 uninstall -y taostest
COPY repository/TDinternal /home/TDinternal
COPY repository/taos-connector-python /home/taos-connector-python
RUN sh -c "cd /home/taos-connector-python; pip3 install ."
COPY setup.sh /home/setup.sh
\ No newline at end of file
COPY setup.sh /home/setup.sh
此差异已折叠。
......@@ -22,7 +22,8 @@ class TDTestCase:
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
tdSql.query("select count(*) from information_schema.ins_columns")
tdSql.checkData(0, 0, 277)
# enterprise version: 285, community version: 277
tdSql.checkData(0, 0, 285)
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
tdSql.checkRows(14)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册