提交 48bcec46 编写于 作者: S slzhou

Merge branch 'szhou/python-udf' of github.com:taosdata/TDengine into szhou/python-udf

......@@ -130,3 +130,4 @@ tools/COPYING
tools/BUGS
tools/taos-tools
tools/taosws-rs
tags
......@@ -104,6 +104,16 @@ sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson
sudo yum config-manager --set-enabled Powertools
```
#### CentOS + devtoolset
除上述编译依赖包,需要执行以下命令:
```
sudo yum install centos-release-scl
sudo yum install devtoolset-9 devtoolset-9-libatomic-devel
scl enable devtoolset-9 -- bash
```
### macOS
```
......@@ -276,7 +286,7 @@ sudo make install
安装成功后,可以在应用程序中双击 TDengine 图标启动服务,或者在终端中启动 TDengine 服务:
```bash
launchctl start com.tdengine.taosd
sudo launchctl start com.tdengine.taosd
```
用户可以使用 TDengine CLI 来连接 TDengine 服务,在终端中,输入:
......
......@@ -111,6 +111,16 @@ If the PowerTools installation fails, you can try to use:
sudo yum config-manager --set-enabled powertools
```
#### For CentOS + devtoolset
Besides above dependencies, please run following commands:
```
sudo yum install centos-release-scl
sudo yum install devtoolset-9 devtoolset-9-libatomic-devel
scl enable devtoolset-9 -- bash
```
### macOS
```
......@@ -286,7 +296,7 @@ Installing from source code will also configure service management for TDengine.
To start the service after installation, double-click the /applications/TDengine to start the program, or in a terminal, use:
```bash
launchctl start com.tdengine.taosd
sudo launchctl start com.tdengine.taosd
```
Then users can use the TDengine CLI to connect the TDengine server. In a terminal, use:
......
......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 22627d7
GIT_TAG 634399d
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -30,25 +30,31 @@ Application programs can execute `INSERT` statement through connectors to insert
The below SQL statement is used to insert one row into table "d1001".
```sql
INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31);
INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31);
```
`ts1` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detial, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
### Insert Multiple Rows
Multiple rows can be inserted in a single SQL statement. The example below inserts 2 rows into table "d1001".
```sql
INSERT INTO d1001 VALUES (1538548684000, 10.2, 220, 0.23) (1538548696650, 10.3, 218, 0.25);
INSERT INTO d1001 VALUES (ts2, 10.2, 220, 0.23) (ts2, 10.3, 218, 0.25);
```
`ts1` and `ts2` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detial, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
### Insert into Multiple Tables
Data can be inserted into multiple tables in the same SQL statement. The example below inserts 2 rows into table "d1001" and 1 row into table "d1002".
```sql
INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, 218, 0.33) d1002 VALUES (1538548696800, 12.3, 221, 0.31);
INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31) (ts2, 12.6, 218, 0.33) d1002 VALUES (ts3, 12.3, 221, 0.31);
```
`ts1`, `ts2` and `ts3` is Unix timestamp, the timestamps which is larger than the difference between current time and KEEP in config is only allowed. For further detial, refer to [TDengine SQL insert timestamp section](/taos-sql/insert).
For more details about `INSERT` please refer to [INSERT](/taos-sql/insert).
:::info
......
......@@ -126,6 +126,22 @@ alter_database_option: {
}
```
### ALTER CACHESIZE
The command of changing database configuration parameters is easy to use, but it's hard to determine whether a parameter is proper or not. In this section we will describe how to determine whether cachesize is big enough.
1. How to check cachesize?
You can use `select * from information_schema.ins_databases;` to get the value of cachesize.
2. How to check cacheload?
You can use `show <db_name>.vgroups;` to check the value of cacheload.
3. Determine whether cachesize is big engough
If the value of `cacheload` is very close to the value of `cachesize`, then it's very probably that `cachesize` is too small. If the value of `cacheload` is much smaller than the value of `cachesize`, then `cachesize` is big enough. You can use this simple principle to determine. Depending on how much memory is available in your system, you can choose to double `cachesize` or incrase it by even 5 or more times.
:::note
Other parameters cannot be modified after the database has been created.
......
......@@ -877,8 +877,8 @@ INTERP(expr)
- The number of rows in the result set of `INTERP` is determined by the parameter `EVERY(time_unit)`. Starting from timestamp1, one interpolation is performed for every time interval specified `time_unit` parameter. The parameter `time_unit` must be an integer, with no quotes, with a time unit of: a(millisecond)), s(second), m(minute), h(hour), d(day), or w(week). For example, `EVERY(500a)` will interpolate every 500 milliseconds.
- Interpolation is performed based on `FILL` parameter. For more information about FILL clause, see [FILL Clause](../distinguished/#fill-clause).
- `INTERP` can only be used to interpolate in single timeline. So it must be used with `partition by tbname` when it's used on a STable.
- Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.1.4).
- Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.2.3).
- Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.2.0).
- Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.3.0).
### LAST
......
......@@ -41,7 +41,7 @@ CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [
```sql
CREATE AGGREGATE FUNCTION l2norm AS "/home/taos/udf_example/libl2norm.so" OUTPUTTYPE DOUBLE bufsize 8;
```
For more information about user-defined functions, see [User-Defined Functions](../../develop/udf).
For more information about user-defined functions, see [User-Defined Functions](/develop/udf).
## Manage UDF
......
......@@ -32,7 +32,9 @@ Please refer to [version support list](/reference/connector#version-support)
## Supported features
### Native connectors
<Tabs defaultValue="native">
<TabItem value="native" label="Native connector">
1. Connection Management
2. General Query
......@@ -41,12 +43,16 @@ Please refer to [version support list](/reference/connector#version-support)
5. Subscription
6. Schemaless
### REST Connector
</TabItem>
<TabItem value="rest" label="REST connector">
1. Connection Management
2. General Query
3. Continuous Query
</TabItem>
</Tabs>
## Installation Steps
### Pre-installation preparation
......@@ -115,6 +121,9 @@ npm install @tdengine/rest
### Verify
<Tabs defaultValue="native">
<TabItem value="native" label="Native connector">
After installing the TDengine client, use the `nodejsChecker.js` program to verify that the current environment supports Node.js access to TDengine.
Verification in details:
......@@ -131,6 +140,28 @@ node nodejsChecker.js host=localhost
- After executing the above steps, the command-line will output the result of `nodejsChecker.js` connecting to the TDengine instance and performing a simple insert and query.
</TabItem>
<TabItem value="rest" label="REST connector">
After installing the TDengine client, use the `restChecker.js` program to verify that the current environment supports Node.js access to TDengine.
Verification in details:
- Create an installation test folder such as `~/tdengine-test`. Download the [restChecker.js source code](https://github.com/taosdata/TDengine/tree/3.0/docs/examples/node/restexample/restChecker.js) to your local.
- Execute the following command from the command-line.
```bash
npm init -y
npm install @tdengine/rest
node restChecker.js
```
- After executing the above steps, the command-line will output the result of `restChecker.js` connecting to the TDengine instance and performing a simple insert and query.
</TabItem>
</Tabs>
## Establishing a connection
Please choose to use one of the connectors.
......@@ -182,24 +213,69 @@ let cursor = conn.cursor();
#### SQL Write
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeInsert />
</TabItem>
<TabItem value="rest" label="REST connection">
```js
{{#include docs/examples/node/restexample/insert_example.js}}
```
</TabItem>
</Tabs>
#### InfluxDB line protocol write
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeInfluxLine />
</TabItem>
</Tabs>
#### OpenTSDB Telnet line protocol write
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeOpenTSDBTelnet />
</TabItem>
</Tabs>
#### OpenTSDB JSON line protocol write
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeOpenTSDBJson />
</TabItem>
</Tabs>
### Querying data
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<NodeQuery />
</TabItem>
<TabItem value="rest" label="REST connection">
```js
{{#include docs/examples/node/restexample/query_example.js}}
```
</TabItem>
</Tabs>
## More sample programs
......
......@@ -106,7 +106,7 @@ The parameters described in this document by the effect that they have on the sy
| Applicable | Server only |
| Meaning | The switch for monitoring inside server. The main object of monitoring is to collect information about load on physical nodes, including CPU usage, memory usage, disk usage, and network bandwidth. Monitoring information is sent over HTTP to the taosKeeper service specified by `monitorFqdn` and `monitorProt`.
| Value Range | 0: monitoring disabled, 1: monitoring enabled |
| Default | 1 |
| Default | 0 |
### monitorFqdn
......@@ -179,9 +179,10 @@ The parameters described in this document by the effect that they have on the sy
| Attribute | Description |
| -------- | -------------------------------- |
| Applicable | Server only |
| Meaning | count()/hyperloglog() return value or not if the result data is NULL |
| Meaning | count()/hyperloglog() return value or not if the input data is empty or NULL |
| Vlue Range | 0:Return empty line,1:Return 0 |
| Default | 1 |
| Notes | When this parameter is setting to 1, for queries containing GROUP BY, PARTITION BY and INTERVAL clause, and input data in certain groups or windows is empty or NULL, the corresponding groups or windows have no return values |
### maxNumOfDistinctRes
......
......@@ -24,7 +24,14 @@ You can compile taosKeeper separately and install it. Please refer to the [taosK
taosKeeper needs to be executed on the terminal of the operating system, it supports three configuration methods: [Command-line arguments](#command-line-arguments-in-detail), [environment variable](#environment-variable-in-detail) and [configuration file](#configuration-file-parameters-in-detail). The precedence of those is Command-line, environment variable and configuration file.
**Make sure that the TDengine cluster is running correctly before running taosKeeper. ** Ensure that the monitoring service in TDengine has been started. For more information, see [TDengine Monitoring Configuration](../config/#monitoring).
**Make sure that the TDengine cluster is running correctly before running taosKeeper.** Ensure that the monitoring service in TDengine has been started. At least the values of `monitor` and `monitorFqdn` need to be set in `taos.cfg`.
```shell
monitor 1
monitorFqdn localhost # taoskeeper's FQDN
```
For more information, see [TDengine Monitoring Configuration](../config/#monitoring).
### Command-Line Parameters
......
const { options, connect } = require("@tdengine/rest");
async function sqlInsert() {
options.path = "/rest/sql";
options.host = "localhost";
options.port = 6041;
let conn = connect(options);
let cursor = conn.cursor();
try {
let res = await cursor.query('CREATE DATABASE power');
res = await cursor.query('CREATE STABLE power.meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int)');
res = await cursor.query('INSERT INTO power.d1001 USING power.meters TAGS ("California.SanFrancisco", 2) VALUES (NOW, 10.2, 219, 0.32)');
console.log("res.getResult()", res.getResult());
} catch (err) {
console.log(err);
}
}
sqlInsert();
const { options, connect } = require("@tdengine/rest");
async function query() {
options.path = "/rest/sql";
options.host = "localhost";
options.port = 6041;
let conn = connect(options);
let cursor = conn.cursor();
try {
let res = await cursor.query('select * from power.meters');
console.log("res.getResult()", res.getResult());
} catch (err) {
console.log(err);
}
}
query();
const { options, connect } = require("@tdengine/rest");
options.path = '/rest/sql/'
options.host = 'localhost';
options.port = 6041;
options.user = "root";
options.passwd = "taosdata";
//optional
// options.url = "http://127.0.0.1:6041";
const db = 'rest_ts_db';
const table = 'rest'
const createDB = `create database if not exists ${db} keep 3650`;
const dropDB = `drop database if exists ${db}`;
const createTB = `create table if not exists ${db}.${table}(ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint,bnr binary(40),nchr nchar(40))`;
const addColumn = `alter table ${db}.${table} add column new_column nchar(40) `;
const dropColumn = `alter table ${db}.${table} drop column new_column`;
const insertSql = `insert into ${db}.${table} values('2022-03-30 18:30:51.567',1,2,3,4,'binary1','nchar1')` +
`('2022-03-30 18:30:51.568',5,6,7,8,'binary2','nchar2')` +
`('2022-03-30 18:30:51.569',9,0,1,2,'binary3','nchar3')`;
const querySql = `select * from ${db}.${table}`;
const errorSql = 'show database';
let conn = connect(options);
let cursor = conn.cursor();
async function execute(sql, pure = true) {
let result = await cursor.query(sql, pure);
// print query result as taos shell
// Get Result object, return Result object.
console.log("result.getResult()",result.getResult());
// Get Meta data, return Meta[]|undefined(when execute failed this is undefined).
console.log("result.getMeta()",result.getMeta());
// Get data,return Array<Array<any>>|undefined(when execute failed this is undefined).
console.log("result.getData()",result.getData());
// Get affect rows,return number|undefined(when execute failed this is undefined).
console.log("result.getAffectRows()",result.getAffectRows());
// Get command,return SQL send to server(need to `query(sql,false)`,set 'pure=false',default true).
console.log("result.getCommand()",result.getCommand());
// Get error code ,return number|undefined(when execute failed this is undefined).
console.log("result.getErrCode()",result.getErrCode());
// Get error string,return string|undefined(when execute failed this is undefined).
console.log("result.getErrStr()",result.getErrStr());
}
(async () => {
// start execute time
let start = new Date().getTime();
await execute(createDB);
console.log("-----------------------------------")
await execute(createTB);
console.log("-----------------------------------")
await execute(addColumn);
console.log("----------------------------------")
await execute(dropColumn);
console.log("-----------------------------------")
await execute(insertSql);
console.log("-----------------------------------")
await execute(querySql);
console.log("-----------------------------------")
await execute(errorSql);
console.log("-----------------------------------")
await execute(dropDB);
// finish time
let end = new Date().getTime();
console.log("total spend time:%d ms",end - start);
})()
......@@ -10,7 +10,7 @@ import PkgListV3 from "/components/PkgListV3";
您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
TDengine 完整的软件包包括服务端(taosd)、应用驱动(taosc)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、命令行程序(CLI,taos)和一些工具软件。目前 taosdump、TDinsight 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../connector/rest-api/)
TDengine 完整的软件包包括服务端(taosd)、应用驱动(taosc)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、命令行程序(CLI,taos)和一些工具软件。目前 TDinsight 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../connector/rest-api/)
为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 Lite 版本的安装包。
......
......@@ -30,25 +30,31 @@ import PhpStmt from "./_php_stmt.mdx";
下面这条 INSERT 就将一条记录写入到表 d1001 中:
```sql
INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31);
INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31);
```
这里的`ts1`为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 [TDengine SQL数据写入 关于时间戳一节](/taos-sql/insert)
### 一次写入多条
TDengine 支持一次写入多条记录,比如下面这条命令就将两条记录写入到表 d1001 中:
```sql
INSERT INTO d1001 VALUES (1538548684000, 10.2, 220, 0.23) (1538548696650, 10.3, 218, 0.25);
INSERT INTO d1001 VALUES (ts1, 10.2, 220, 0.23) (ts2, 10.3, 218, 0.25);
```
这里的`ts1`和`ts2`为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 [TDengine SQL数据写入 关于时间戳一节](/taos-sql/insert)
### 一次写入多表
TDengine 也支持一次向多个表写入数据,比如下面这条命令就向 d1001 写入两条记录,向 d1002 写入一条记录:
```sql
INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, 218, 0.33) d1002 VALUES (1538548696800, 12.3, 221, 0.31);
INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31) (ts2, 12.6, 218, 0.33) d1002 VALUES (ts3, 12.3, 221, 0.31);
```
这里的`ts1`、`ts2`和`ts3`为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 [TDengine SQL数据写入 关于时间戳一节](/taos-sql/insert)
详细的 SQL INSERT 语法规则参考 [TDengine SQL 的数据写入](/taos-sql/insert)。
:::info
......
......@@ -330,6 +330,7 @@ typedef struct taosField {
:::note
2.0 及以上版本 TDengine 推荐数据库应用的每个线程都建立一个独立的连接,或基于线程建立连接池。而不推荐在应用中将该连接 (TAOS\*) 结构体传递到不同的线程共享使用。基于 TAOS 结构体发出的查询、写入等操作具有多线程安全性,但 “USE statement” 等状态量有可能在线程之间相互干扰。此外,C 语言的连接器可以按照需求动态建立面向数据库的新连接(该过程对用户不可见),同时建议只有在程序最后退出的时候才调用 `taos_close()` 关闭连接。
另一个需要注意的是,在上述同步 API 执行过程中,不能调用类似 pthread_cancel 之类的 API 来强制结束线程,因为涉及一些模块的同步操作,如果强制结束线程有可能造成包括但不限于死锁等异常状况。
:::
......
......@@ -31,7 +31,8 @@ REST 连接器支持所有能运行 Node.js 的平台。
## 支持的功能特性
### 原生连接器
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接器">
1. 连接管理
2. 普通查询
......@@ -40,12 +41,17 @@ REST 连接器支持所有能运行 Node.js 的平台。
5. 订阅功能
6. Schemaless
### REST 连接器
</TabItem>
<TabItem value="rest" label="REST 连接器">
1. 连接管理
2. 普通查询
3. 连续查询
</TabItem>
</Tabs>
## 安装步骤
### 安装前准备
......@@ -114,6 +120,9 @@ npm install @tdengine/rest
### 安装验证
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接器">
在安装好 TDengine 客户端后,使用 nodejsChecker.js 程序能够验证当前环境是否支持 Node.js 方式访问 TDengine。
验证方法:
......@@ -130,11 +139,35 @@ node nodejsChecker.js host=localhost
- 执行以上步骤后,在命令行会输出 nodejsChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。
</TabItem>
<TabItem value="rest" label="REST 连接器">
在安装好 TDengine 客户端后,使用 nodejsChecker.js 程序能够验证当前环境是否支持 Node.js 方式访问 TDengine。
验证方法:
- 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [restChecker.js 源代码](https://github.com/taosdata/TDengine/tree/3.0/docs/examples/node/restexample/restChecker.js)到本地。
- 在命令行中执行以下命令。
```bash
npm init -y
npm install @tdengine/rest
node restChecker.js
```
- 执行以上步骤后,在命令行会输出 restChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。
</TabItem>
</Tabs>
## 建立连接
请选择使用一种连接器。
<Tabs defaultValue="rest">
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
安装并引用 `@tdengine/client` 包。
......@@ -181,24 +214,71 @@ let cursor = conn.cursor();
#### SQL 写入
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeInsert />
</TabItem>
<TabItem value="rest" label="REST 连接">
```js
{{#include docs/examples/node/restexample/insert_example.js}}
```
</TabItem>
</Tabs>
#### InfluxDB 行协议写入
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeInfluxLine />
</TabItem>
</Tabs>
#### OpenTSDB Telnet 行协议写入
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeOpenTSDBTelnet />
</TabItem>
</Tabs>
#### OpenTSDB JSON 行协议写入
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeOpenTSDBJson />
</TabItem>
</Tabs>
### 查询数据
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
<NodeQuery />
</TabItem>
<TabItem value="rest" label="REST 连接">
```js
{{#include docs/examples/node/restexample/query_example.js}}
```
</TabItem>
</Tabs>
## 更多示例程序
......
......@@ -126,6 +126,22 @@ alter_database_option: {
}
```
### 修改 CACHESIZE
修改数据库参数的命令使用简单,难的是如何确定是否需要修改以及如何修改。本小节描述如何判断数据库的 cachesize 是否够用。
1. 如何查看 cachesize?
通过 select * from information_schema.ins_databases; 可以查看这些 cachesize 的具体值。
2. 如何查看 cacheload?
通过 show <db_name>.vgroups; 可以查看 cacheload
3. 判断 cachesize 是否够用
如果 cacheload 非常接近 cachesize,则 cachesize 可能过小。 如果 cacheload 明显小于 cachesize 则 cachesize 是够用的。可以根据这个原则判断是否需要修改 cachesize 。具体修改值可以根据系统可用内存情况来决定是加倍或者是提高几倍。
:::note
其它参数在 3.0.0.0 中暂不支持修改
......
......@@ -879,8 +879,8 @@ INTERP(expr)
- INTERP 根据 EVERY(time_unit) 字段来确定输出时间范围内的结果条数,即从 timestamp1 开始每隔固定长度的时间(time_unit 值)进行插值,time_unit 可取值时间单位:1a(毫秒),1s(秒),1m(分),1h(小时),1d(天),1w(周)。例如 EVERY(500a) 将对于指定数据每500毫秒间隔进行一次插值.
- INTERP 根据 FILL 字段来决定在每个符合输出条件的时刻如何进行插值。关于 FILL 子句如何使用请参考 [FILL 子句](../distinguished/#fill-子句)
- INTERP 只能在一个时间序列内进行插值,因此当作用于超级表时必须跟 partition by tbname 一起使用。
- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.1.4版本以后支持)。
- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.2.3版本以后支持)。
- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0版本以后支持)。
- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0版本以后支持)。
### LAST
......
......@@ -106,7 +106,7 @@ taos --dump-config
| 适用范围 | 仅服务端适用 |
| 含义 | 服务器内部的系统监控开关。监控主要负责收集物理节点的负载状况,包括 CPU、内存、硬盘、网络带宽的监控记录,监控信息将通过 HTTP 协议发送给由 `monitorFqdn``monitorProt` 指定的 TaosKeeper 监控服务 |
| 取值范围 | 0:关闭监控服务, 1:激活监控服务。 |
| 缺省值 | 1 |
| 缺省值 | 0 |
### monitorFqdn
......@@ -197,9 +197,10 @@ taos --dump-config
| 属性 | 说明 |
| -------- | -------------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | count/hyperloglog函数在数据为空或者NULL的情况下是否返回值 |
| 含义 | count/hyperloglog函数在输入数据为空或者NULL的情况下是否返回值 |
| 取值范围 | 0:返回空行,1:返回 0 |
| 缺省值 | 1 |
| 补充说明 | 该参数设置为 1 时,如果查询中含有 GROUP BY,PARTITION BY 以及 INTERVAL 子句且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果 |
## 区域相关
......
......@@ -24,7 +24,15 @@ taosKeeper 安装方式:
taosKeeper 需要在操作系统终端执行,该工具支持三种配置方式:[命令行参数](#命令行参数启动)[环境变量](#环境变量启动)[配置文件](#配置文件启动)。优先级为:命令行参数、环境变量、配置文件参数。
**在运行 taosKeeper 之前要确保 TDengine 集群与 taosAdapter 已经在正确运行。** 并且 TDengine 已经开启监控服务,具体请参考:[TDengine 监控配置](../config/#监控相关)
**在运行 taosKeeper 之前要确保 TDengine 集群与 taosAdapter 已经在正确运行。** 并且 TDengine 已经开启监控服务,TDengine 配置文件 `taos.cfg` 中至少需要配置 `monitor``monitorFqdn`
```shell
monitor 1
monitorFqdn localhost # taoskeeper 服务的 FQDN
```
TDengine 监控配置相关,具体请参考:[TDengine 监控配置](../config/#监控相关)
### 命令行参数启动
......
......@@ -29,6 +29,7 @@ extern "C" {
typedef struct SBuffer SBuffer;
typedef struct SSchema SSchema;
typedef struct SSchema2 SSchema2;
typedef struct STColumn STColumn;
typedef struct STSchema STSchema;
typedef struct SValue SValue;
......@@ -205,7 +206,7 @@ struct SColData {
int32_t numOfNull; // # of null
int32_t numOfValue; // # of vale
int32_t nVal;
uint8_t flag;
int8_t flag;
uint8_t *pBitMap;
int32_t *aOffset;
int32_t nData;
......
......@@ -144,6 +144,8 @@ typedef enum _mgmt_table {
#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8
#define TSDB_ALTER_TABLE_UPDATE_OPTIONS 9
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME 10
#define TSDB_ALTER_TABLE_ADD_TAG_INDEX 11
#define TSDB_ALTER_TABLE_DROP_TAG_INDEX 12
#define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1
......@@ -294,6 +296,15 @@ struct SSchema {
char name[TSDB_COL_NAME_LEN];
};
struct SSchema2 {
int8_t type;
int8_t flags;
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
char alias[TSDB_COL_NAME_LEN];
};
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
......@@ -347,8 +358,19 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
#define IS_IDX_ON(s) (((s)->flags & 0x02) == COL_IDX_ON)
#define IS_SET_NULL(s) (((s)->flags & COL_SET_NULL) == COL_SET_NULL)
#define SSCHMEA_SET_IDX_ON(s) \
do { \
(s)->flags |= COL_IDX_ON; \
} while (0)
#define SSCHMEA_SET_IDX_OFF(s) \
do { \
(s)->flags &= (~COL_IDX_ON); \
} while (0)
#define SSCHMEA_TYPE(s) ((s)->type)
#define SSCHMEA_FLAGS(s) ((s)->flags)
#define SSCHMEA_COLID(s) ((s)->colId)
......@@ -1240,6 +1262,17 @@ typedef struct {
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
typedef struct {
char colName[TSDB_COL_NAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
int64_t stbUid;
int64_t dbUid;
int64_t reserved[8];
} SDropIndexReq;
int32_t tSerializeSDropIdxReq(void* buf, int32_t bufLen, SDropIndexReq* pReq);
int32_t tDeserializeSDropIdxReq(void* buf, int32_t bufLen, SDropIndexReq* pReq);
typedef struct {
int64_t dbUid;
char db[TSDB_DB_FNAME_LEN];
......@@ -2812,6 +2845,22 @@ typedef struct {
int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
char idxName[TSDB_COL_NAME_LEN];
int8_t idxType;
} SCreateTagIndexReq;
int32_t tSerializeSCreateTagIdxReq(void* buf, int32_t bufLen, SCreateTagIndexReq* pReq);
int32_t tDeserializeSCreateTagIdxReq(void* buf, int32_t bufLen, SCreateTagIndexReq* pReq);
typedef SMDropSmaReq SDropTagIndexReq;
int32_t tSerializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq);
int32_t tDeserializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq);
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
......@@ -3307,6 +3356,12 @@ typedef struct {
SArray* aSubmitTbData; // SArray<SSubmitTbData>
} SSubmitReq2;
typedef struct {
SMsgHead header;
int64_t version;
char data[]; // SSubmitReq2
} SSubmitReq2Msg;
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag);
......@@ -3323,6 +3378,7 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag);
#define TSDB_MSG_FLG_ENCODE 0x1
#define TSDB_MSG_FLG_DECODE 0x2
#define TSDB_MSG_FLG_CMPT 0x3
typedef struct {
union {
......
......@@ -220,6 +220,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "vnode-drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TRIM, "vnode-trim", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
......
......@@ -33,6 +33,8 @@ extern "C" {
#endif // if !defined(WINDOWS)
int32_t taosMemoryDbgInit();
int32_t taosMemoryDbgInitRestore();
void *taosMemoryMalloc(int64_t size);
void *taosMemoryCalloc(int64_t num, int64_t size);
void *taosMemoryRealloc(void *ptr, int64_t size);
......
......@@ -253,6 +253,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E)
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F)
// mnode-func
#define TSDB_CODE_MND_INVALID_FUNC_NAME TAOS_DEF_ERROR_CODE(0, 0x0370)
// #define TSDB_CODE_MND_INVALID_FUNC_LEN TAOS_DEF_ERROR_CODE(0, 0x0371) // 2.x
......@@ -265,6 +266,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_FUNC_COMMENT TAOS_DEF_ERROR_CODE(0, 0x0378)
#define TSDB_CODE_MND_INVALID_FUNC_RETRIEVE TAOS_DEF_ERROR_CODE(0, 0x0379)
// mnode-db
#define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380)
#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //
......@@ -359,12 +362,18 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4)
#define TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB TAOS_DEF_ERROR_CODE(0, 0x03F5)
#define TSDB_CODE_MND_TOO_MANY_STREAMS TAOS_DEF_ERROR_CODE(0, 0x03F6)
#define TSDB_CODE_MND_INVALID_TARGET_TABLE TAOS_DEF_ERROR_CODE(0, 0x03F7)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
#define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0481)
#define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0482)
// mnode-tag-indxe
#define TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0483)
#define TSDB_CODE_MND_TAG_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0484)
// dnode
// #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) // 2.x
// #define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) // 2.x
......
......@@ -104,6 +104,7 @@ extern const int32_t TYPE_BYTES[16];
#define TSDB_INDEX_TYPE_SMA "SMA"
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
#define TSDB_INDEX_TYPE_NORMAL "NORMAL"
#define TSDB_INS_USER_STABLES_DBNAME_COLID 2
......
TDengine is a high-efficient, scalable, high-available distributed time-series database, which makes a lot of optimizations on inserting and querying data, which is far more efficient than normal regular databases. So TDengine can meet the high requirements of IOT and other areas on storing and querying a large amount of data.
TDengine is an open-source, cloud-native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. With its built-in caching, stream processing, and data subscription capabilities, TDengine offers a simplified solution for time-series data processing.
To configure TDengine : edit /etc/taos/taos.cfg
To start service : launchctl start com.tdengine.taosd
To start Taos Adapter : launchctl start com.tdengine.taosadapter
To access TDengine : use taos in shell
If you're experiencing problems installing TDengine, check the file /var/log/taos/tdengine_install.log to help troubleshoot the installation.
TDengine is an open-source, cloud-native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. With its built-in caching, stream processing, and data subscription capabilities, TDengine offers a simplified solution for time-series data processing.
Once it's installed, please take the steps below:
1: open a terminal/shell in Mac
2: if connecting to Cloud Service, follow the instructions on your cloud service account and configure the environment variable
3: if connecting to another TDengine Service, you can also view help information via "taos --help"
4: execute command taos
If you're experiencing problems installing TDengine, check the file /var/log/taos/tdengine_install.log to help troubleshoot the installation.
......@@ -531,13 +531,13 @@ function install_taosadapter_service() {
}
function install_service_on_launchctl() {
${csudouser}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
${csudo}cp ${script_dir}/com.taosdata.taosd.plist /Library/LaunchDaemons/com.taosdata.taosd.plist
${csudouser}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
${csudouser}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist > /dev/null 2>&1 || :
${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist > /dev/null 2>&1 || :
${csudo}cp ${script_dir}/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist
${csudouser}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist > /dev/null 2>&1 || :
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist > /dev/null 2>&1 || :
}
function install_service() {
......
......@@ -48,7 +48,7 @@ fi
data_link_dir="${install_main_dir}/data"
log_link_dir="${install_main_dir}/log"
install_log_path="${log_dir}/taos_install.log"
install_log_path="${log_dir}/tdengine_install.log"
# static directory
cfg_dir="${install_main_dir}/cfg"
......@@ -573,14 +573,14 @@ function install_service_on_systemd() {
function install_service_on_launchctl() {
if [ -f ${install_main_dir}/service/com.taosdata.taosd.plist ]; then
${csudouser}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
${csudo}cp ${install_main_dir}/service/com.taosdata.taosd.plist /Library/LaunchDaemons/com.taosdata.taosd.plist || :
${csudouser}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosd.plist || :
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosd.plist || :
fi
if [ -f ${install_main_dir}/service/com.taosdata.taosadapter.plist ]; then
${csudouser}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist > /dev/null 2>&1 || :
${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist > /dev/null 2>&1 || :
${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
${csudouser}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
fi
}
......
......@@ -58,9 +58,9 @@ Source: {#MyAppSourceDir}{#MyAppDriverName}; DestDir: "{app}\driver"; Flags: igN
Source: {#MyAppSourceDir}{#MyAppIncludeName}; DestDir: "{app}\include"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExcludeSource} ; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}\taos.exe; DestDir: "{app}"; DestName: "{#CusPrompt}.EXE"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}\taosBenchmark.exe; DestDir: "{app}"; DestName: "{#CusPrompt}Benchmark.EXE"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}\taosdump.exe; DestDir: "{app}"; DestName: "{#CusPrompt}dump.EXE"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}\taos.exe; DestDir: "{app}"; DestName: "{#CusPrompt}.exe"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}\taosBenchmark.exe; DestDir: "{app}"; DestName: "{#CusPrompt}Benchmark.exe"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}\taosdump.exe; DestDir: "{app}"; DestName: "{#CusPrompt}dump.exe"; Flags: igNoreversion recursesubdirs createallsubdirs
[run]
......
......@@ -151,6 +151,10 @@ typedef struct STscObj {
SHashObj* pRequests;
} STscObj;
typedef struct STscDbg {
bool memEnable;
} STscDbg;
typedef struct SResultColumn {
union {
char* nullbitmap; // bitmap, one bit for each item in the list
......
......@@ -33,6 +33,7 @@
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
STscDbg tscDbg = {0};
SAppInfo appInfo;
int64_t lastClusterId = 0;
int32_t clientReqRefPool = -1;
......@@ -515,6 +516,18 @@ void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
}
void taos_init_imp(void) {
#if defined(LINUX)
if (tscDbg.memEnable) {
int32_t code = taosMemoryDbgInit();
if (code) {
printf("failed to init memory dbg, error:%s\n", tstrerror(code));
} else {
tsAsyncLog = false;
printf("memory dbg enabled\n");
}
}
#endif
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup);
......
......@@ -2346,6 +2346,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
return NULL;
}
tscDebug("taos_query start with sql:%s", sql);
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
......@@ -2360,6 +2362,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
taosMemoryFree(param);
}
tscDebug("taos_query end with sql:%s", sql);
return pRequest;
}
......
......@@ -884,7 +884,9 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest)
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
int64_t connId = *(int64_t *)taos;
tscDebug("taos_query_a start with sql:%s", sql);
taosAsyncQueryImpl(connId, sql, fp, param, false);
tscDebug("taos_query_a end with sql:%s", sql);
}
void taos_query_a_with_reqid(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, int64_t reqid) {
......
......@@ -25,7 +25,9 @@
#include "tref.h"
#include "ttimer.h"
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
static tb_uid_t processSuid(tb_uid_t suid, char* db){
return suid + MurmurHash3_32(db, strlen(db));
}
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t) {
......@@ -1642,6 +1644,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
if (strcmp(tbName, pCreateReq.name) == 0) {
cloneSVreateTbReq(&pCreateReq, &pCreateReqDst);
pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb);
tDecoderClear(&decoderTmp);
break;
}
......
......@@ -1273,7 +1273,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
}
static void smlPrintStatisticInfo(SSmlHandle *info) {
uError(
uDebug(
"SML:0x%" PRIx64
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
......
......@@ -418,7 +418,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
if(ASSERT(waitingRspNum >= 0)){
if (ASSERT(waitingRspNum >= 0)) {
tscError("tmqCommitRspCountDown error:%d", waitingRspNum);
return;
}
......
......@@ -120,6 +120,8 @@ static const SSysDbTableSchema userIdxSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "column_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "index_type", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema userStbsSchema[] = {
......
......@@ -918,6 +918,65 @@ int32_t tDeserializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropSmaReq *pReq)
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSCreateTagIdxReq(void *buf, int32_t bufLen, SCreateTagIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->stbName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->colName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->idxName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->idxType) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSCreateTagIdxReq(void *buf, int32_t bufLen, SCreateTagIndexReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->stbName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->colName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->idxName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->idxType) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
tEndEncode(&encoder);
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSMCreateFullTextReq(void *buf, int32_t bufLen, SMCreateFullTextReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......@@ -3984,6 +4043,44 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSDropIdxReq(void *buf, int32_t bufLen, SDropIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->colName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->stb) < 0) return -1;
if (tEncodeI64(&encoder, pReq->stbUid) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
// TODO
return 0;
}
int32_t tDeserializeSDropIdxReq(void *buf, int32_t bufLen, SDropIndexReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->colName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->stb) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->stbUid) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
// TODO
return 0;
}
int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *pReq) {
SEncoder encoder = {0};
......@@ -6995,9 +7092,13 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
return;
}
if (flag == TSDB_MSG_FLG_ENCODE) {
if (flag == TSDB_MSG_FLG_ENCODE || flag == TSDB_MSG_FLG_CMPT) {
if (pTbData->pCreateTbReq) {
if (flag == TSDB_MSG_FLG_ENCODE) {
tdDestroySVCreateTbReq(pTbData->pCreateTbReq);
} else {
tDestroySVCreateTbReq(pTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
}
taosMemoryFree(pTbData->pCreateTbReq);
}
......
......@@ -29,6 +29,7 @@
#define DM_MACHINE_CODE "Get machine code."
#define DM_VERSION "Print program version."
#define DM_EMAIL "<support@taosdata.com>"
#define DM_MEM_DBG "Enable memory debug"
// clang-format on
static struct {
#ifdef WINDOWS
......@@ -37,6 +38,7 @@ static struct {
bool dumpConfig;
bool dumpSdb;
bool generateGrant;
bool memDbg;
bool printAuth;
bool printVersion;
bool printHelp;
......@@ -166,8 +168,10 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
} else if (strcmp(argv[i], "-e") == 0) {
global.envCmd[cmdEnvIndex] = argv[++i];
cmdEnvIndex++;
} else if (strcmp(argv[i], "-dm") == 0) {
global.memDbg = true;
} else if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 ||
strcmp(argv[i], "-?")) {
strcmp(argv[i], "-?") == 0) {
global.printHelp = true;
} else {
}
......@@ -212,6 +216,7 @@ static void dmPrintHelp() {
printf("%s%s%s%s\n", indent, "-e,", indent, DM_ENV_CMD);
printf("%s%s%s%s\n", indent, "-E,", indent, DM_ENV_FILE);
printf("%s%s%s%s\n", indent, "-k,", indent, DM_MACHINE_CODE);
printf("%s%s%s%s\n", indent, "-dm,", indent, DM_MEM_DBG);
printf("%s%s%s%s\n", indent, "-V,", indent, DM_VERSION);
printf("\n\nReport bugs to %s.\n", DM_EMAIL);
......@@ -272,6 +277,18 @@ int mainWindows(int argc, char **argv) {
return 0;
}
#if defined(LINUX)
if (global.memDbg) {
int32_t code = taosMemoryDbgInit();
if (code) {
printf("failed to init memory dbg, error:%s\n", tstrerror(code));
return code;
}
tsAsyncLog = false;
printf("memory dbg enabled\n");
}
#endif
if (dmInitLog() != 0) {
printf("failed to start since init log error\n");
taosCleanupArgs();
......
......@@ -157,6 +157,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
......@@ -182,6 +184,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
......
......@@ -522,6 +522,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -389,6 +389,18 @@ typedef struct {
SSchemaWrapper schemaTag; // for dstVgroup
} SSmaObj;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
char dstTbName[TSDB_TABLE_FNAME_LEN];
char colName[TSDB_COL_NAME_LEN];
int64_t createdTime;
int64_t uid;
int64_t stbUid;
int64_t dbUid;
} SIdxObj;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
......
#ifndef _TD_MND_IDX_H_
#define _TD_MND_IDX_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitIdx(SMnode *pMnode);
void mndCleanupIdx(SMnode *pMnode);
SIdxObj *mndAcquireIdx(SMnode *pMnode, char *Name);
void mndReleaseIdx(SMnode *pMnode, SIdxObj *pSma);
int32_t mndDropIdxsByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxObj *pIdx);
int32_t mndGetTableIdx(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist);
int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq);
int32_t mndSetCreateIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetDropIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetDropIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetAlterIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetAlterIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_IDX_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_IDX_COMM_H_
#define _TD_MND_IDX_COMM_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SSIdx {
int type; // sma or idx
void *pIdx;
} SSIdx;
// retrieve sma index and tag index
typedef struct {
void *pSmaIter;
void *pIdxIter;
} SSmaAndTagIter;
int32_t mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_IDX_COMM_H_*/
\ No newline at end of file
......@@ -42,6 +42,11 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
const char *mndGetStbStr(const char *src);
int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew);
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId);
void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void *alterOriData,
int32_t alterOriDataLen);
#ifdef __cplusplus
}
#endif
......
......@@ -17,6 +17,7 @@
#include "mndDb.h"
#include "mndCluster.h"
#include "mndDnode.h"
#include "mndIndex.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndSma.h"
......@@ -1053,6 +1054,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropIdxsByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndUserRemoveDb(pMnode, pTrans, pDb->name) != 0) goto _OVER;
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndIndex.h"
#include "mndIndexComm.h"
#include "mndSma.h"
static void *mndGetIdx(SMnode *pMnode, char *name, int type) {
SSdb *pSdb = pMnode->pSdb;
void *pIdx = sdbAcquire(pSdb, type, name);
if (pIdx == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = 0;
}
return pIdx;
}
int mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx) {
SSmaObj *pSma = mndGetIdx(pMnode, name, SDB_SMA);
SIdxObj *pIdx = mndGetIdx(pMnode, name, SDB_IDX);
terrno = 0;
if (pSma == NULL && pIdx == NULL) return 0;
if (pSma != NULL) {
if (type == SDB_SMA) {
idx->type = SDB_SMA;
idx->pIdx = pSma;
} else {
mndReleaseSma(pMnode, pSma);
terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST;
return -1;
}
} else {
if (type == SDB_IDX) {
idx->type = SDB_IDX;
idx->pIdx = pIdx;
} else {
mndReleaseIdx(pMnode, pIdx);
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
return -1;
}
}
return 0;
}
......@@ -21,6 +21,7 @@
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndGrant.h"
#include "mndIndex.h"
#include "mndInfoSchema.h"
#include "mndMnode.h"
#include "mndPerfSchema.h"
......@@ -425,6 +426,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-idx", mndInitIdx, mndCleanupIdx) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
......
......@@ -17,6 +17,8 @@
#include "mndSma.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndIndex.h"
#include "mndIndexComm.h"
#include "mndInfoSchema.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
......@@ -43,9 +45,13 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
static void mndDestroySmaObj(SSmaObj *pSmaObj);
// sma and tag index comm func
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
int32_t mndInitSma(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_SMA,
......@@ -58,14 +64,14 @@ int32_t mndInitSma(SMnode *pMnode) {
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -712,8 +718,13 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
goto _OVER;
}
SSIdx idx = {0};
if (mndAcquireGlobalIdx(pMnode, createReq.name, SDB_SMA, &idx) == 0) {
pSma = idx.pIdx;
} else {
goto _OVER;
}
pSma = mndAcquireSma(pMnode, createReq.name);
if (pSma != NULL) {
if (createReq.igExists) {
mInfo("sma:%s, already exist in sma:%s, ignore exist is set", createReq.name, pSma->name);
......@@ -959,7 +970,12 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
mInfo("sma:%s, start to drop", dropReq.name);
pSma = mndAcquireSma(pMnode, dropReq.name);
SSIdx idx = {0};
if (mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx) == 0) {
pSma = idx.pIdx;
} else {
goto _OVER;
}
if (pSma == NULL) {
if (dropReq.igNotExists) {
mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
......@@ -998,7 +1014,14 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
int32_t code = -1;
SSmaObj *pSma = NULL;
pSma = mndAcquireSma(pMnode, indexReq->indexFName);
SSIdx idx = {0};
if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
pSma = idx.pIdx;
} else {
*exist = false;
return 0;
}
if (pSma == NULL) {
*exist = false;
return 0;
......@@ -1207,10 +1230,10 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0;
}
SSmaAndTagIter *pIter = pShow->pIter;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_SMA, pShow->pIter, (void **)&pSma);
if (pShow->pIter == NULL) break;
pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
if (pIter->pSmaIter == NULL) break;
if (NULL != pDb && pSma->dbUid != pDb->uid) {
sdbRelease(pSdb, pSma);
......@@ -1247,6 +1270,18 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(col, (char *)"");
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)col, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tag, (char *)"sma_index");
colDataAppend(pColInfo, numOfRows, (const char *)tag, false);
numOfRows++;
sdbRelease(pSdb, pSma);
}
......@@ -1256,7 +1291,30 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
return numOfRows;
}
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) {
// sma and tag index comm func
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
int ret = mndProcessDropSmaReq(pReq);
if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST) {
terrno = 0;
ret = mndProcessDropTagIdxReq(pReq);
}
return ret;
}
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
if (pShow->pIter == NULL) {
pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
}
int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
if (read < rows) read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
return read;
}
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
SSmaAndTagIter *p = pIter;
if (p != NULL) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
sdbCancelFetch(pSdb, p->pSmaIter);
sdbCancelFetch(pSdb, p->pIdxIter);
}
taosMemoryFree(p);
}
......@@ -31,7 +31,7 @@
#define MND_STREAM_VER_NUMBER 2
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 10
#define MND_STREAM_MAX_NUM 60
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
......@@ -680,7 +680,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than 10 for each database");
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
goto _OVER;
}
......@@ -692,10 +692,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than 10 for each database");
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
goto _OVER;
}
if (pStream->targetStbUid == streamObj.targetStbUid) {
mError("Cannot write the same stable as other stream:%s", pStream->name);
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
goto _OVER;
}
}
}
......
......@@ -1008,6 +1008,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen) != 0) {
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
} else if (pTrans->originRpcType == TDMT_MND_CREATE_INDEX) {
void *pCont = NULL;
int32_t contLen = 0;
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen) != 0) {
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
}
if (pTrans->rpcRspLen != 0) {
......
......@@ -147,7 +147,8 @@ typedef enum {
SDB_STB = 18,
SDB_DB = 19,
SDB_FUNC = 20,
SDB_MAX = 21
SDB_IDX = 21,
SDB_MAX = 22
} ESdbType;
typedef struct SSdbRaw {
......
......@@ -60,6 +60,8 @@ const char *sdbTableName(ESdbType type) {
return "db";
case SDB_FUNC:
return "func";
case SDB_IDX:
return "idx";
default:
return "undefine";
}
......
......@@ -59,7 +59,7 @@ void vnodePostClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode);
void vnodeClose(SVnode *pVnode);
int32_t vnodeSyncCommit(SVnode *pVnode);
int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeBegin(SVnode *pVnode);
int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode);
......@@ -137,6 +137,7 @@ typedef struct SMetaFltParam {
int16_t type;
void *val;
bool reverse;
bool equal;
int (*filterFunc)(void *a, void *b, int16_t type);
} SMetaFltParam;
......@@ -270,8 +271,8 @@ int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen,
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock2(STqReader *pReader);
bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData** pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData** pSubmitTbDataRet);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
// int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
......
......@@ -142,6 +142,9 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaAlterCache(SMeta* pMeta, int32_t nPage);
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
......
......@@ -1108,26 +1108,30 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids)
}
int32_t valid = 0;
while (1) {
int32_t count = 0;
static const int8_t TRY_ERROR_LIMIT = 1;
do {
void *entryKey = NULL;
int32_t nEntryKey = -1;
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL);
if (valid < 0) break;
SCtimeIdxKey *p = entryKey;
if (count > TRY_ERROR_LIMIT) break;
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type);
if (cmp == 0) taosArrayPush(pUids, &p->uid);
if (param->reverse == false) {
if (cmp == -1) break;
} else if (param->reverse) {
if (cmp == 1) break;
if (cmp == 0)
taosArrayPush(pUids, &p->uid);
else {
if (param->equal == true) {
if (count > TRY_ERROR_LIMIT) break;
count++;
}
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) break;
}
} while (1);
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
......@@ -1162,29 +1166,34 @@ int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
if (tdbTbcMoveTo(pCursor->pCur, pName, strlen(pName) + 1, &cmp) < 0) {
goto END;
}
bool first = true;
int32_t valid = 0;
while (1) {
int32_t count = 0;
int32_t TRY_ERROR_LIMIT = 1;
do {
void *pEntryKey = NULL, *pEntryVal = NULL;
int32_t nEntryKey = -1, nEntryVal = 0;
valid = tdbTbcGet(pCursor->pCur, (const void **)pEntryKey, &nEntryKey, (const void **)&pEntryVal, &nEntryVal);
if (valid < 0) break;
if (count > TRY_ERROR_LIMIT) break;
char *pTableKey = (char *)pEntryKey;
cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type);
if (cmp == 0) {
tb_uid_t tuid = *(tb_uid_t *)pEntryVal;
taosArrayPush(pUids, &tuid);
} else if (cmp == 1) {
// next
} else {
break;
if (param->equal == true) {
if (count > TRY_ERROR_LIMIT) break;
count++;
}
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) {
break;
}
}
} while (1);
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
......@@ -1240,7 +1249,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
pCursor->type = param->type;
metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL);
ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
if (ret < 0) {
goto END;
}
......@@ -1284,32 +1293,40 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
goto END;
}
bool first = true;
int count = 0;
int32_t valid = 0;
while (1) {
bool found = false;
static const int8_t TRY_ERROR_LIMIT = 1;
/// src: [[suid, cid1, type1]....[suid, cid2, type2]....[suid, cid3, type3]...]
/// target: [suid, cid2, type2]
do {
void *entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal;
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
if (valid < 0) {
tdbFree(entryVal);
break;
}
if (count > TRY_ERROR_LIMIT) {
break;
}
STagIdxKey *p = entryKey;
if (p == NULL) break;
if (p->type != pCursor->type) {
if (first) {
if (p->type != pCursor->type || p->suid != pCursor->suid || p->cid != pCursor->cid) {
if (found == true) break;
count++;
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) break;
continue;
} else {
if (valid < 0) {
break;
} else {
continue;
}
}
if (p->suid != pKey->suid) {
break;
}
first = false;
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
if (cmp == 0) {
// match
......@@ -1320,17 +1337,18 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
}
taosArrayPush(pUids, &tuid);
} else if (cmp == 1) {
// not match but should continue to iter
found = true;
} else {
// not match and no more result
break;
if (param->equal == true) {
if (count > TRY_ERROR_LIMIT) break;
count++;
}
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) {
break;
}
}
} while (1);
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
......@@ -1358,7 +1376,7 @@ static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, voi
return ret;
}
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags) {
const int32_t LIMIT = 128;
const int32_t LIMIT = 4096;
int32_t isLock = false;
int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
......@@ -1401,6 +1419,7 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj
taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
}
}
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
......
......@@ -394,6 +394,301 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tdbTbcClose(pUidIdxc);
return 0;
}
int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
STbDbKey tbDbKey = {0};
TBC *pUidIdxc = NULL;
TBC *pTbDbc = NULL;
void *pData = NULL;
int nData = 0;
int64_t oversion;
SDecoder dc = {0};
int32_t ret;
int32_t c = -2;
tb_uid_t suid = pReq->suid;
// get super table
if (tdbTbGet(pMeta->pUidIdx, &suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
ret = -1;
goto _err;
}
tbDbKey.uid = suid;
tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
tDecoderInit(&dc, pData, nData);
ret = metaDecodeEntry(&dc, &oStbEntry);
if (ret < 0) {
goto _err;
}
if (oStbEntry.stbEntry.schemaTag.pSchema == NULL || oStbEntry.stbEntry.schemaTag.pSchema == NULL) {
goto _err;
}
if (oStbEntry.stbEntry.schemaTag.version == pReq->schemaTag.version) {
goto _err;
}
if (oStbEntry.stbEntry.schemaTag.nCols != pReq->schemaTag.nCols) {
goto _err;
}
int diffIdx = -1;
for (int i = 0; i < pReq->schemaTag.nCols; i++) {
SSchema *pNew = pReq->schemaTag.pSchema + i;
SSchema *pOld = oStbEntry.stbEntry.schemaTag.pSchema + i;
if (pNew->type != pOld->type || pNew->colId != pOld->colId || pNew->bytes != pOld->bytes ||
strncmp(pOld->name, pNew->name, sizeof(pNew->name))) {
goto _err;
}
if (IS_IDX_ON(pNew) && !IS_IDX_ON(pOld)) {
if (diffIdx != -1) goto _err;
diffIdx = i;
}
}
if (diffIdx == -1 && diffIdx == 0) {
goto _err;
}
// Get target schema info
SSchemaWrapper *pTagSchema = &pReq->schemaTag;
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
SSchema *pCol = pTagSchema->pSchema + diffIdx;
/*
* iterator all pTdDbc by uid and version
*/
TBC *pCtbIdxc = NULL;
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (rc < 0) {
tdbTbcClose(pCtbIdxc);
goto _err;
}
for (;;) {
void *pKey = NULL, *pVal = NULL;
int nKey = 0, nVal = 0;
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal);
if (rc < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCtbIdxc);
pCtbIdxc = NULL;
break;
}
if (((SCtbIdxKey *)pKey)->suid != suid) {
tdbFree(pKey);
tdbFree(pVal);
continue;
}
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
const void *pTagData = NULL;
int32_t nTagData = 0;
SCtbIdxKey *table = (SCtbIdxKey *)pKey;
STagVal tagVal = {.cid = pCol->colId};
tTagGet((const STag *)pVal, &tagVal);
if (IS_VAR_DATA_TYPE(pCol->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
} else {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pCol->type].bytes;
}
rc = metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, table->uid, &pTagIdxKey, &nTagIdxKey);
tdbFree(pKey);
tdbFree(pVal);
if (rc < 0) {
metaDestroyTagIdxKey(pTagIdxKey);
tdbTbcClose(pCtbIdxc);
goto _err;
}
metaWLock(pMeta);
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
metaULock(pMeta);
metaDestroyTagIdxKey(pTagIdxKey);
}
nStbEntry.version = version;
nStbEntry.type = TSDB_SUPER_TABLE;
nStbEntry.uid = pReq->suid;
nStbEntry.name = pReq->name;
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
metaWLock(pMeta);
// update table.db
metaSaveToTbDb(pMeta, &nStbEntry);
// update uid index
metaUpdateUidIdx(pMeta, &nStbEntry);
metaULock(pMeta);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
tDecoderClear(&dc);
tdbFree(pData);
tdbTbcClose(pCtbIdxc);
return TSDB_CODE_SUCCESS;
_err:
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
tDecoderClear(&dc);
tdbFree(pData);
return TSDB_CODE_VND_COL_ALREADY_EXISTS;
}
int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
STbDbKey tbDbKey = {0};
TBC *pUidIdxc = NULL;
TBC *pTbDbc = NULL;
int ret = 0;
int c = -2;
void *pData = NULL;
int nData = 0;
int64_t oversion;
SDecoder dc = {0};
tb_uid_t suid = pReq->stbUid;
if (tdbTbGet(pMeta->pUidIdx, &suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
ret = -1;
goto _err;
}
tbDbKey.uid = suid;
tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
tDecoderInit(&dc, pData, nData);
ret = metaDecodeEntry(&dc, &oStbEntry);
if (ret < 0) {
goto _err;
}
SSchema *pCol = NULL;
int32_t colId = -1;
for (int i = 0; i < oStbEntry.stbEntry.schemaTag.nCols; i++) {
SSchema *schema = oStbEntry.stbEntry.schemaTag.pSchema + i;
if (0 == strncmp(schema->name, pReq->colName, sizeof(pReq->colName))) {
if (i != 0 || IS_IDX_ON(schema)) {
pCol = schema;
}
break;
}
}
if (pCol == NULL) {
goto _err;
}
/*
* iterator all pTdDbc by uid and version
*/
TBC *pCtbIdxc = NULL;
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (rc < 0) {
tdbTbcClose(pCtbIdxc);
goto _err;
}
for (;;) {
void *pKey = NULL, *pVal = NULL;
int nKey = 0, nVal = 0;
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal);
if (rc < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCtbIdxc);
pCtbIdxc = NULL;
break;
}
if (((SCtbIdxKey *)pKey)->suid != suid) {
tdbFree(pKey);
tdbFree(pVal);
continue;
}
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
const void *pTagData = NULL;
int32_t nTagData = 0;
SCtbIdxKey *table = (SCtbIdxKey *)pKey;
STagVal tagVal = {.cid = pCol->colId};
tTagGet((const STag *)pVal, &tagVal);
if (IS_VAR_DATA_TYPE(pCol->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
} else {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pCol->type].bytes;
}
rc = metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, table->uid, &pTagIdxKey, &nTagIdxKey);
tdbFree(pKey);
tdbFree(pVal);
if (rc < 0) {
metaDestroyTagIdxKey(pTagIdxKey);
tdbTbcClose(pCtbIdxc);
goto _err;
}
metaWLock(pMeta);
tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn);
metaULock(pMeta);
metaDestroyTagIdxKey(pTagIdxKey);
}
// clear idx flag
SSCHMEA_SET_IDX_OFF(pCol);
nStbEntry.version = version;
nStbEntry.type = TSDB_SUPER_TABLE;
nStbEntry.uid = oStbEntry.uid;
nStbEntry.name = oStbEntry.name;
SSchemaWrapper *row = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaRow);
SSchemaWrapper *tag = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaTag);
nStbEntry.stbEntry.schemaRow = *row;
nStbEntry.stbEntry.schemaTag = *tag;
nStbEntry.stbEntry.rsmaParam = oStbEntry.stbEntry.rsmaParam;
metaWLock(pMeta);
// update table.db
metaSaveToTbDb(pMeta, &nStbEntry);
// update uid index
metaUpdateUidIdx(pMeta, &nStbEntry);
metaULock(pMeta);
tDeleteSSchemaWrapper(tag);
tDeleteSSchemaWrapper(row);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
tDecoderClear(&dc);
tdbFree(pData);
tdbTbcClose(pCtbIdxc);
return TSDB_CODE_SUCCESS;
_err:
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
tDecoderClear(&dc);
tdbFree(pData);
return -1;
}
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
SMetaEntry me = {0};
......@@ -649,10 +944,16 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
tDecoderInit(&tdc, tData, tLen);
metaDecodeEntry(&tdc, &stbEntry);
const SSchema *pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
if (pTagColumn->type == TSDB_DATA_TYPE_JSON) {
SSchema *pTagColumn = NULL;
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
metaDelJsonVarFromIdx(pMeta, &e, pTagColumn);
} else {
for (int i = 0; i < pTagSchema->nCols; i++) {
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[i];
if (!IS_IDX_ON(pTagColumn)) continue;
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
......@@ -675,6 +976,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
}
metaDestroyTagIdxKey(pTagIdxKey);
}
}
tDecoderClear(&tdc);
}
tdbFree(tData);
......@@ -931,12 +1233,11 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
entry.version = version;
metaDeleteNcolIdx(pMeta, &oldEntry);
metaUpdateNcolIdx(pMeta, &entry);
// do actual write
metaWLock(pMeta);
metaDeleteNcolIdx(pMeta, &oldEntry);
metaUpdateNcolIdx(pMeta, &entry);
// save to table db
metaSaveToTbDb(pMeta, &entry);
......@@ -1117,9 +1418,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
// save to uid.idx
metaUpdateUidIdx(pMeta, &ctbEntry);
if (iCol == 0) {
metaUpdateTagIdx(pMeta, &ctbEntry);
}
if (NULL == ctbEntry.ctbEntry.pTags) {
metaError("meta/table: null tags, update tag val failed.");
......@@ -1254,6 +1553,243 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
return 0;
}
static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry stbEntry = {0};
void *pVal = NULL;
int nVal = 0;
int ret;
int c;
tb_uid_t uid, suid;
int64_t oversion;
const void *pData = NULL;
int nData = 0;
SDecoder dc = {0};
if (pAlterTbReq->tagName == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
}
uid = *(tb_uid_t *)pVal;
tdbFree(pVal);
pVal = NULL;
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(tb_uid_t), &pVal, &nVal) == -1) {
ret = -1;
goto _err;
}
suid = ((SUidIdxVal *)pVal)[0].suid;
STbDbKey tbDbKey = {0};
tbDbKey.uid = suid;
tbDbKey.version = ((SUidIdxVal *)pVal)[0].version;
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pVal, &nVal);
tDecoderInit(&dc, pVal, nVal);
ret = metaDecodeEntry(&dc, &stbEntry);
if (ret < 0) {
goto _err;
}
// Get target schema info
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
SSchema *pCol = NULL;
int32_t iCol = 0;
for (;;) {
pCol = NULL;
if (iCol >= pTagSchema->nCols) break;
pCol = &pTagSchema->pSchema[iCol];
if (strcmp(pCol->name, pAlterTbReq->tagName) == 0) break;
iCol++;
}
if (iCol == 0) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
if (pCol == NULL) {
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
goto _err;
}
/*
* iterator all pTdDbc by uid and version
*/
TBC *pCtbIdxc = NULL;
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (rc < 0) {
tdbTbcClose(pCtbIdxc);
goto _err;
}
for (;;) {
void *pKey, *pVal;
int nKey, nVal;
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal);
if (rc < 0) break;
if (((SCtbIdxKey *)pKey)->suid != uid) {
tdbFree(pKey);
tdbFree(pVal);
continue;
}
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
const void *pTagData = NULL;
int32_t nTagData = 0;
STagVal tagVal = {.cid = pCol->colId};
tTagGet((const STag *)pVal, &tagVal);
if (IS_VAR_DATA_TYPE(pCol->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
} else {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pCol->type].bytes;
}
if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) {
metaDestroyTagIdxKey(pTagIdxKey);
goto _err;
}
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
metaDestroyTagIdxKey(pTagIdxKey);
}
return 0;
_err:
// tDecoderClear(&dc1);
// tDecoderClear(&dc2);
// if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
// if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
// tdbTbcClose(pTbDbc);
// tdbTbcClose(pUidIdxc);
return -1;
}
typedef struct SMetaPair {
void *key;
int nkey;
} SMetaPair;
static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry stbEntry = {0};
void *pVal = NULL;
int nVal = 0;
int ret;
int c;
tb_uid_t suid;
int64_t oversion;
const void *pData = NULL;
int nData = 0;
SDecoder dc = {0};
if (pAlterTbReq->tagName == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
}
suid = *(tb_uid_t *)pVal;
tdbFree(pVal);
pVal = NULL;
if (tdbTbGet(pMeta->pUidIdx, &suid, sizeof(tb_uid_t), &pVal, &nVal) == -1) {
ret = -1;
goto _err;
}
STbDbKey tbDbKey = {0};
tbDbKey.uid = suid;
tbDbKey.version = ((SUidIdxVal *)pVal)[0].version;
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pVal, &nVal);
tDecoderInit(&dc, pVal, nVal);
ret = metaDecodeEntry(&dc, &stbEntry);
if (ret < 0) {
goto _err;
}
// Get targe schema info
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
SSchema *pCol = NULL;
int32_t iCol = 0;
for (;;) {
pCol = NULL;
if (iCol >= pTagSchema->nCols) break;
pCol = &pTagSchema->pSchema[iCol];
if (strcmp(pCol->name, pAlterTbReq->tagName) == 0) break;
iCol++;
}
if (iCol == 0) {
// cannot drop 1th tag index
terrno = -1;
goto _err;
}
if (pCol == NULL) {
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
goto _err;
}
if (IS_IDX_ON(pCol)) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
SArray *tagIdxList = taosArrayInit(512, sizeof(SMetaPair));
TBC *pTagIdxc = NULL;
tdbTbcOpen(pMeta->pTagIdx, &pTagIdxc, NULL);
int rc =
tdbTbcMoveTo(pTagIdxc, &(STagIdxKey){.suid = suid, .cid = INT32_MIN, .type = pCol->type}, sizeof(STagIdxKey), &c);
for (;;) {
void *pKey, *pVal;
int nKey, nVal;
rc = tdbTbcNext(pTagIdxc, &pKey, &nKey, &pVal, &nVal);
STagIdxKey *pIdxKey = (STagIdxKey *)pKey;
if (pIdxKey->suid != suid || pIdxKey->cid != pCol->colId) {
tdbFree(pKey);
tdbFree(pVal);
continue;
}
SMetaPair pair = {.key = pKey, nKey = nKey};
taosArrayPush(tagIdxList, &pair);
}
tdbTbcClose(pTagIdxc);
metaWLock(pMeta);
for (int i = 0; i < taosArrayGetSize(tagIdxList); i++) {
SMetaPair *pair = taosArrayGet(tagIdxList, i);
tdbTbDelete(pMeta->pTagIdx, pair->key, pair->nkey, pMeta->txn);
}
metaULock(pMeta);
taosArrayDestroy(tagIdxList);
// set pCol->flags; INDEX_ON
return 0;
_err:
return -1;
}
int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pMetaRsp) {
switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
......@@ -1265,6 +1801,10 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMeta
return metaUpdateTableTagVal(pMeta, version, pReq);
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
return metaUpdateTableOptions(pMeta, version, pReq);
case TSDB_ALTER_TABLE_ADD_TAG_INDEX:
return metaAddTagIndex(pMeta, version, pReq);
case TSDB_ALTER_TABLE_DROP_TAG_INDEX:
return metaDropTagIndex(pMeta, version, pReq);
default:
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
return -1;
......@@ -1429,10 +1969,21 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
goto end;
}
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
STagVal tagVal = {.cid = pTagColumn->colId};
pTagData = pCtbEntry->ctbEntry.pTags;
nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
ret = metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
goto end;
} else {
for (int i = 0; i < pTagSchema->nCols; i++) {
pTagColumn = &pTagSchema->pSchema[i];
if (i != 0 && !IS_IDX_ON(pTagColumn)) continue;
STagVal tagVal = {.cid = pTagColumn->colId};
if (pTagColumn->type != TSDB_DATA_TYPE_JSON) {
tTagGet((const STag *)pCtbEntry->ctbEntry.pTags, &tagVal);
if (IS_VAR_DATA_TYPE(pTagColumn->type)) {
pTagData = tagVal.pData;
......@@ -1441,14 +1992,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pTagColumn->type].bytes;
}
} else {
// pTagData = pCtbEntry->ctbEntry.pTags;
// nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
pTagData = pCtbEntry->ctbEntry.pTags;
nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
ret = metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
goto end;
}
if (pTagData != NULL) {
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
......@@ -1457,8 +2001,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
}
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
}
end:
metaDestroyTagIdxKey(pTagIdxKey);
}
}
end:
// metaDestroyTagIdxKey(pTagIdxKey);
tDecoderClear(&dc);
tdbFree(pData);
return ret;
......
......@@ -258,7 +258,6 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
SSubmitTbData tbData = {0};
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) {
goto _end;
}
......@@ -313,14 +312,15 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) {
SEncoder encoder;
len += sizeof(SMsgHead);
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
goto _end;
}
((SMsgHead *)pBuf)->vgId = TD_VID(pVnode);
((SMsgHead *)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode);
((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
/*vError("failed to encode submit req since %s", terrstr());*/
......
......@@ -683,13 +683,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHead->msgType == TDMT_VND_SUBMIT) {
SPackedData submit = {
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)),
.msgLen = pHead->bodyLen - sizeof(SMsgHead),
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId,
TD_VID(pTq->pVnode), req.subKey);
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode),
req.subKey);
return -1;
}
if (taosxRsp.blockNum > 0 /* threshold */) {
......
......@@ -207,8 +207,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
void* pReq = POINTER_SHIFT(msg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead);
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
TMSG_INFO(msgType), msg, pReq, len);
......
......@@ -311,8 +311,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
return -1;
}
void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SMsgHead));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SMsgHead);
void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pWalReader->pHead->head.version;
#if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
......@@ -560,7 +560,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
......@@ -1012,7 +1012,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
......@@ -1064,7 +1064,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
for (int32_t j = 0; j < numOfCols; j++){
for (int32_t j = 0; j < numOfCols; j++) {
SColData* pCol = taosArrayGet(pCols, j);
SColVal colVal;
tColDataGetValue(pCol, i, &colVal);
......@@ -1089,7 +1089,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){
if (pSW == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
......@@ -1161,7 +1161,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
SRow* pRow = taosArrayGetP(pRows, i);
bool buildNew = false;
for (int32_t j = 0; j < pTschema->numOfCols; j++){
for (int32_t j = 0; j < pTschema->numOfCols; j++) {
SColVal colVal;
tRowGet(pRow, pTschema, j, &colVal);
if (curRow == 0) {
......@@ -1185,7 +1185,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){
if (pSW == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
......@@ -1256,7 +1256,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
taosMemoryFree(assigned);
return 0;
FAIL:
FAIL:
taosMemoryFree(assigned);
return -1;
}
......@@ -1358,7 +1358,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
taosArrayDestroy(qa);
} else {
// TODO handle delete table from stb
tqReaderRemoveTbUidList(pExec->execHandle.pExecReader, tbUidList);
}
}
}
......
......@@ -71,7 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0;
}
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
......@@ -340,7 +339,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t v
((SMsgHead*)(*pBuf))->vgId = vgId;
((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
SEncoder coder = {0};
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) );
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
rpcFreeCont(*pBuf);
*pBuf = NULL;
......@@ -498,7 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
taosArrayPush(tagArray, &tagVal);
}
}
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
pCreateTbReq->ctb.tagNum = size;
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
......@@ -648,7 +647,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else{
} else {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
......@@ -692,14 +691,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
int32_t code;
tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code);
SEncoder encoder;
len += sizeof(SMsgHead);
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
goto _end;
}
((SMsgHead*)pBuf)->vgId = TD_VID(pVnode);
((SMsgHead*)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
......
......@@ -22,7 +22,7 @@
* us: 3600*1000000*8765*1000 // 1970 + 1000 years
* ns: 3600*1000000000*8765*292 // 1970 + 292 years
*/
static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
......@@ -60,23 +60,6 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
return 0;
}
#if 0
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey = TD_ROW_KEY(row);
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d, table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
rowKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
}
return 0;
}
#endif
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
if (rowKey < minKey || rowKey > maxKey) {
......@@ -89,79 +72,6 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowK
return 0;
}
#if 0
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
terrno = TSDB_CODE_SUCCESS;
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
// pBlock->uid = htobe64(pBlock->uid);
// pBlock->suid = htobe64(pBlock->suid);
// pBlock->sversion = htonl(pBlock->sversion);
// pBlock->dataLen = htonl(pBlock->dataLen);
// pBlock->schemaLen = htonl(pBlock->schemaLen);
// pBlock->numOfRows = htonl(pBlock->numOfRows);
#if 0
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d, invalid action trying to insert a super table %s", REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pTsdb, pBlock, pTable) < 0) {
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
continue;
} else {
return -1;
}
}
#endif
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) {
return -1;
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
#endif
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
int32_t code = 0;
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
......
......@@ -96,6 +96,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
metaRsp.numOfColumns = schema.nCols;
metaRsp.precision = pVnode->config.tsdbCfg.precision;
metaRsp.sversion = schema.version;
metaRsp.tversion = schemaTag.version;
metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));
memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
......@@ -264,16 +265,15 @@ _exit:
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void vnodeFreeSBatchRspMsg(void* p) {
static FORCE_INLINE void vnodeFreeSBatchRspMsg(void *p) {
if (NULL == p) {
return;
}
SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
SBatchRspMsg *pRsp = (SBatchRspMsg *)p;
rpcFreeCont(pRsp->msg);
}
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t rspSize = 0;
......@@ -405,7 +405,8 @@ void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64, "nInsert");
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64, "nInsertSuccess");
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64, "nBatchInsert");
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64, "nBatchInsertSuccess");
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64,
"nBatchInsertSuccess");
}
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
......
......@@ -101,6 +101,8 @@ extern "C" {
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"
#define COMMAND_ENABLE_RESCHEDULE "enableReSchedule"
#define COMMAND_CATALOG_DEBUG "catalogDebug"
#define COMMAND_ENABLE_MEM_DEBUG "enableMemDebug"
#define COMMAND_DISABLE_MEM_DEBUG "disableMemDebug"
typedef struct SExplainGroup {
int32_t nodeNum;
......
......@@ -687,6 +687,21 @@ static int32_t execAlterCmd(char* cmd, char* value, bool* processed) {
code = schedulerEnableReSchedule(atoi(value));
} else if (0 == strcasecmp(cmd, COMMAND_CATALOG_DEBUG)) {
code = ctgdHandleDbgCommand(value);
} else if (0 == strcasecmp(cmd, COMMAND_ENABLE_MEM_DEBUG)) {
code = taosMemoryDbgInit();
if (code) {
qError("failed to init memory dbg, error:%s", tstrerror(code));
return code;
}
tsAsyncLog = false;
qInfo("memory dbg enabled");
} else if (0 == strcasecmp(cmd, COMMAND_DISABLE_MEM_DEBUG)) {
code = taosMemoryDbgInitRestore();
if (code) {
qError("failed to restore from memory dbg, error:%s", tstrerror(code));
return code;
}
qInfo("memory dbg disabled");
} else {
goto _return;
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册