提交 c0bd39cd 编写于 作者: G Ganlin Zhao

Merge branch '3.0' into fix/TD-17499

...@@ -2,11 +2,8 @@ ...@@ -2,11 +2,8 @@
sidebar_label: Docker sidebar_label: Docker
title: 通过 Docker 快速体验 TDengine title: 通过 Docker 快速体验 TDengine
--- ---
:::info
如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
:::
本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉Docker, 请使用[安装包的方式快速体验](../../get-started/package/) 本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
## 启动 TDengine ## 启动 TDengine
......
...@@ -5,8 +5,9 @@ title: 使用安装包立即开始 ...@@ -5,8 +5,9 @@ title: 使用安装包立即开始
import Tabs from "@theme/Tabs"; import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem"; import TabItem from "@theme/TabItem";
import PkgListV3 from "/components/PkgListV3";
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/package/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. 在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/docker/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
## 安装 ## 安装
...@@ -64,6 +65,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm ...@@ -64,6 +65,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
<TabItem label="tar.gz 安装" value="tarinst"> <TabItem label="tar.gz 安装" value="tarinst">
<PkgListV3 type={0}/>
1.[发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; 1.[发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本: 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
...@@ -85,6 +88,8 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 ...@@ -85,6 +88,8 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问
</TabItem> </TabItem>
<TabItem label="Windows 安装" value="windows"> <TabItem label="Windows 安装" value="windows">
<PkgListV3 type={1}/>
1.[发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 1.[发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
......
...@@ -87,6 +87,25 @@ void commitSync() throws SQLException; ...@@ -87,6 +87,25 @@ void commitSync() throws SQLException;
void close() throws SQLException; void close() throws SQLException;
``` ```
</TabItem>
<TabItem label="Go" value="Go">
```go
func NewConsumer(conf *Config) (*Consumer, error)
func (c *Consumer) Close() error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) Subscribe(topics []string) error
func (c *Consumer) Unsubscribe() error
```
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -229,6 +248,56 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> { ...@@ -229,6 +248,56 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
} }
``` ```
</TabItem>
<TabItem label="Go" value="Go">
```go
config := tmq.NewConfig()
defer config.Destroy()
err = config.SetGroupID("test")
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
}
err = config.SetConnectIP("127.0.0.1")
if err != nil {
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil {
panic(err)
}
```
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -260,6 +329,20 @@ topics.add("tmq_topic"); ...@@ -260,6 +329,20 @@ topics.add("tmq_topic");
consumer.subscribe(topics); consumer.subscribe(topics);
``` ```
</TabItem>
<TabItem value="Go" label="Go">
```go
consumer, err := tmq.NewConsumer(config)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
if err != nil {
panic(err)
}
```
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -293,6 +376,21 @@ while(running){ ...@@ -293,6 +376,21 @@ while(running){
} }
``` ```
</TabItem>
<TabItem value="Go" label="Go">
```go
for {
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
}
fmt.Println(result)
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
}
```
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -322,6 +420,13 @@ consumer.unsubscribe(); ...@@ -322,6 +420,13 @@ consumer.unsubscribe();
consumer.close(); consumer.close();
``` ```
</TabItem>
<TabItem value="Go" label="Go">
```go
consumer.Close()
```
</TabItem> </TabItem>
</Tabs> </Tabs>
......
...@@ -6,8 +6,8 @@ description: "TDengine 3.0 版本的语法变更说明" ...@@ -6,8 +6,8 @@ description: "TDengine 3.0 版本的语法变更说明"
## SQL 基本元素变更 ## SQL 基本元素变更
| # | **元素** | **差异性** | **说明** | | # | **元素** | **<div style={{width: 60}}>差异性</div>** | **说明** |
| - | :------- | :--------: | :------- | | - | :------- | :-------- | :------- |
| 1 | VARCHAR | 新增 | BINARY类型的别名。 | 1 | VARCHAR | 新增 | BINARY类型的别名。
| 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。 | 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。
| 3 | _ROWTS伪列 | 新增 | 表示时间戳主键。是_C0伪列的别名。 | 3 | _ROWTS伪列 | 新增 | 表示时间戳主键。是_C0伪列的别名。
...@@ -22,8 +22,8 @@ description: "TDengine 3.0 版本的语法变更说明" ...@@ -22,8 +22,8 @@ description: "TDengine 3.0 版本的语法变更说明"
在 TDengine 中,普通表的数据模型中可使用以下数据类型。 在 TDengine 中,普通表的数据模型中可使用以下数据类型。
| # | **语句** | **差异性** | **说明** | | # | **语句** | **<div style={{width: 60}}>差异性</div>** | **说明** |
| - | :------- | :--------: | :------- | | - | :------- | :-------- | :------- |
| 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。 | 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。 | 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。
| 3 | ALTER DATABASE | 调整 | 废除<ul><li>QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。</li><li>BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。</li><li>CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。</li><li>COMP:3.0版本暂不支持修改。<br/>新增</li><li>CACHEMODEL:表示是否在内存中缓存子表的最近数据。</li><li>CACHESIZE:表示缓存子表最近数据的内存大小。</li><li>WAL_FSYNC_PERIOD:代替原FSYNC参数。</li><li>WAL_LEVEL:代替原WAL参数。<br/>调整</li><li>REPLICA:3.0.0版本暂不支持修改。</li><li>KEEP:3.0版本新增支持带单位的设置方式。</li></ul> | 3 | ALTER DATABASE | 调整 | 废除<ul><li>QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。</li><li>BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。</li><li>CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。</li><li>COMP:3.0版本暂不支持修改。<br/>新增</li><li>CACHEMODEL:表示是否在内存中缓存子表的最近数据。</li><li>CACHESIZE:表示缓存子表最近数据的内存大小。</li><li>WAL_FSYNC_PERIOD:代替原FSYNC参数。</li><li>WAL_LEVEL:代替原WAL参数。<br/>调整</li><li>REPLICA:3.0.0版本暂不支持修改。</li><li>KEEP:3.0版本新增支持带单位的设置方式。</li></ul>
...@@ -80,8 +80,8 @@ description: "TDengine 3.0 版本的语法变更说明" ...@@ -80,8 +80,8 @@ description: "TDengine 3.0 版本的语法变更说明"
## SQL 函数变更 ## SQL 函数变更
| # | **函数** | **差异性** | **说明** | | # | **函数** | ** <div style={{width: 60}}>差异性</div> ** | **说明** |
| - | :------- | :--------: | :------- | | - | :------- | :-------- | :------- |
| 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 3 | LEASTSQUARES | 增强 | 可以用于超级表了。 | 3 | LEASTSQUARES | 增强 | 可以用于超级表了。
......
...@@ -93,12 +93,12 @@ Maven 项目中,在 pom.xml 中添加以下依赖: ...@@ -93,12 +93,12 @@ Maven 项目中,在 pom.xml 中添加以下依赖:
可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector 可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector
```shell ```shell
git clone https://github.com/taosdata/taos-connector-jdbc.git --branch 2.0 git clone https://github.com/taosdata/taos-connector-jdbc.git
cd taos-connector-jdbc cd taos-connector-jdbc
mvn clean install -Dmaven.test.skip=true mvn clean install -Dmaven.test.skip=true
``` ```
编译后,在 target 目录下会产生 taos-jdbcdriver-2.0.XX-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。 编译后,在 target 目录下会产生 taos-jdbcdriver-3.0.*-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -198,7 +198,7 @@ url 中的配置参数如下: ...@@ -198,7 +198,7 @@ url 中的配置参数如下:
- user:登录 TDengine 用户名,默认值 'root'。 - user:登录 TDengine 用户名,默认值 'root'。
- password:用户登录密码,默认值 'taosdata'。 - password:用户登录密码,默认值 'taosdata'。
- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 开始,JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。 - batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。 - charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。 - batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。
- httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。 - httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。
...@@ -216,7 +216,7 @@ url 中的配置参数如下: ...@@ -216,7 +216,7 @@ url 中的配置参数如下:
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6); INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);
``` ```
- 从 taos-jdbcdriver-2.0.36 开始,如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6); - 如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
::: :::
...@@ -230,7 +230,7 @@ INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFra ...@@ -230,7 +230,7 @@ INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFra
**注意**: **注意**:
- 应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。 - 应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。
- 以下示例代码基于 taos-jdbcdriver-2.0.36 - 以下示例代码基于 taos-jdbcdriver-3.0.0
```java ```java
public Connection getConn() throws Exception{ public Connection getConn() throws Exception{
...@@ -367,7 +367,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据 ...@@ -367,7 +367,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据
**注意**: **注意**:
- JDBC REST 连接目前不支持参数绑定 - JDBC REST 连接目前不支持参数绑定
- 以下示例代码基于 taos-jdbcdriver-2.0.36 - 以下示例代码基于 taos-jdbcdriver-3.0.0
- binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法 - binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法
- setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽 - setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽
...@@ -635,7 +635,7 @@ TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协 ...@@ -635,7 +635,7 @@ TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协
**注意**: **注意**:
- JDBC REST 连接目前不支持无模式写入 - JDBC REST 连接目前不支持无模式写入
- 以下示例代码基于 taos-jdbcdriver-2.0.36 - 以下示例代码基于 taos-jdbcdriver-3.0.0
```java ```java
public class SchemalessInsertTest { public class SchemalessInsertTest {
...@@ -666,7 +666,7 @@ public class SchemalessInsertTest { ...@@ -666,7 +666,7 @@ public class SchemalessInsertTest {
} }
``` ```
### 订阅 ### 数据订阅
TDengine Java 连接器支持订阅功能,应用 API 如下: TDengine Java 连接器支持订阅功能,应用 API 如下:
...@@ -717,9 +717,14 @@ while(true) { ...@@ -717,9 +717,14 @@ while(true) {
#### 关闭订阅 #### 关闭订阅
```java ```java
// 取消订阅
consumer.unsubscribe();
// 关闭消费
consumer.close() consumer.close()
``` ```
详情请参考:[数据订阅](../../../develop/tmq)
### 使用示例如下: ### 使用示例如下:
```java ```java
...@@ -734,7 +739,7 @@ public abstract class ConsumerLoop { ...@@ -734,7 +739,7 @@ public abstract class ConsumerLoop {
config.setProperty("msg.with.table.name", "true"); config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true"); config.setProperty("enable.auto.commit", "true");
config.setProperty("group.id", "group1"); config.setProperty("group.id", "group1");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer"); config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
this.consumer = new TaosConsumer<>(config); this.consumer = new TaosConsumer<>(config);
this.topics = Collections.singletonList("topic_speed"); this.topics = Collections.singletonList("topic_speed");
...@@ -754,8 +759,9 @@ public abstract class ConsumerLoop { ...@@ -754,8 +759,9 @@ public abstract class ConsumerLoop {
process(record); process(record);
} }
} }
consumer.unsubscribe();
} finally { } finally {
consumer.close(); consumer.close();
shutdownLatch.countDown(); shutdownLatch.countDown();
} }
} }
...@@ -875,6 +881,7 @@ public static void main(String[] args) throws Exception { ...@@ -875,6 +881,7 @@ public static void main(String[] args) throws Exception {
| taos-jdbcdriver 版本 | 主要变化 | | taos-jdbcdriver 版本 | 主要变化 |
| :------------------: | :----------------------------: | | :------------------: | :----------------------------: |
| 3.0.0 | 支持 TDengine 3.0 |
| 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 | | 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 |
| 2.0.38 | JDBC REST 连接增加批量拉取功能 | | 2.0.38 | JDBC REST 连接增加批量拉取功能 |
| 2.0.37 | 增加对 json tag 支持 | | 2.0.37 | 增加对 json tag 支持 |
......
...@@ -25,33 +25,34 @@ extern "C" { ...@@ -25,33 +25,34 @@ extern "C" {
#endif #endif
typedef struct SUpdateInfo { typedef struct SUpdateInfo {
SArray *pTsBuckets; SArray *pTsBuckets;
uint64_t numBuckets; uint64_t numBuckets;
SArray *pTsSBFs; SArray *pTsSBFs;
uint64_t numSBFs; uint64_t numSBFs;
int64_t interval; int64_t interval;
int64_t watermark; int64_t watermark;
TSKEY minTS; TSKEY minTS;
SScalableBf* pCloseWinSBF; SScalableBf *pCloseWinSBF;
SHashObj* pMap; SHashObj *pMap;
STimeWindow scanWindow; STimeWindow scanWindow;
uint64_t scanGroupId; uint64_t scanGroupId;
uint64_t maxVersion; uint64_t maxVersion;
} SUpdateInfo; } SUpdateInfo;
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
void updateInfoDestroy(SUpdateInfo *pInfo); bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /* ifndef _TSTREAMUPDATE_H_ */ #endif /* ifndef _TSTREAMUPDATE_H_ */
\ No newline at end of file
...@@ -30,6 +30,7 @@ extern bool gRaftDetailLog; ...@@ -30,6 +30,7 @@ extern bool gRaftDetailLog;
#define SYNC_SPEED_UP_HB_TIMER 400 #define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) #define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE 100 #define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_MAX_READ_RANGE 10
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
...@@ -210,9 +211,12 @@ void syncStop(int64_t rid); ...@@ -210,9 +211,12 @@ void syncStop(int64_t rid);
int32_t syncSetStandby(int64_t rid); int32_t syncSetStandby(int64_t rid);
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
bool syncIsReady(int64_t rid); bool syncIsReady(int64_t rid);
bool syncIsReadyForRead(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid); const char* syncGetMyRoleStr(int64_t rid);
bool syncRestoreFinish(int64_t rid); bool syncRestoreFinish(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid);
SyncIndex syncGetLastIndex(int64_t rid);
SyncIndex syncGetCommitIndex(int64_t rid);
SyncGroupId syncGetVgId(int64_t rid); SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
...@@ -56,8 +56,8 @@ struct tmq_conf_t { ...@@ -56,8 +56,8 @@ struct tmq_conf_t {
int8_t autoCommit; int8_t autoCommit;
int8_t resetOffset; int8_t resetOffset;
int8_t withTbName; int8_t withTbName;
int8_t ssEnable; int8_t snapEnable;
int32_t ssBatchSize; int32_t snapBatchSize;
bool hbBgEnable; bool hbBgEnable;
...@@ -287,16 +287,21 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -287,16 +287,21 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
if (strcmp(key, "experimental.snapshot.enable") == 0) { if (strcmp(key, "experimental.snapshot.enable") == 0) {
if (strcmp(value, "true") == 0) { if (strcmp(value, "true") == 0) {
conf->ssEnable = true; conf->snapEnable = true;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) { } else if (strcmp(value, "false") == 0) {
conf->ssEnable = false; conf->snapEnable = false;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} else { } else {
return TMQ_CONF_INVALID; return TMQ_CONF_INVALID;
} }
} }
if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
conf->snapBatchSize = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "enable.heartbeat.background") == 0) { if (strcmp(key, "enable.heartbeat.background") == 0) {
if (strcmp(value, "true") == 0) { if (strcmp(value, "true") == 0) {
conf->hbBgEnable = true; conf->hbBgEnable = true;
...@@ -310,11 +315,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -310,11 +315,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
conf->ssBatchSize = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "td.connect.ip") == 0) { if (strcmp(key, "td.connect.ip") == 0) {
conf->ip = strdup(value); conf->ip = strdup(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
...@@ -889,7 +889,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -889,7 +889,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
pTmq->withTbName = conf->withTbName; pTmq->withTbName = conf->withTbName;
pTmq->useSnapshot = conf->ssEnable; pTmq->useSnapshot = conf->snapEnable;
pTmq->autoCommit = conf->autoCommit; pTmq->autoCommit = conf->autoCommit;
pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commitCb = conf->commitCb; pTmq->commitCb = conf->commitCb;
......
...@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode); ...@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsReadyForRead(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -283,7 +283,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -283,7 +283,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) { if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
return 0; return 0;
} }
...@@ -307,7 +307,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -307,7 +307,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) && pMsg->msgType == TDMT_VND_BATCH_META) &&
!vnodeIsLeader(pVnode)) { !vnodeIsReadyForRead(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
return 0; return 0;
} }
......
...@@ -781,3 +781,17 @@ bool vnodeIsLeader(SVnode *pVnode) { ...@@ -781,3 +781,17 @@ bool vnodeIsLeader(SVnode *pVnode) {
return true; return true;
} }
bool vnodeIsReadyForRead(SVnode *pVnode) {
if (syncIsReady(pVnode->sync)) {
return true;
}
if (syncIsReadyForRead(pVnode->sync)) {
return true;
}
vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
return false;
}
...@@ -13,11 +13,11 @@ ...@@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "filter.h" #include "filter.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "os.h"
#include "querynodes.h" #include "querynodes.h"
#include "systable.h" #include "systable.h"
#include "tname.h" #include "tname.h"
...@@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro ...@@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SResultRowPosition* p1 = SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
(SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
if (p1 == NULL) { if (p1 == NULL) {
return NULL; return NULL;
...@@ -238,7 +238,7 @@ static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDat ...@@ -238,7 +238,7 @@ static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDat
// todo move to the initialization function // todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows); bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
filterFreeInfo(filter); filterFreeInfo(filter);
return keep; return keep;
...@@ -312,9 +312,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -312,9 +312,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
loadSMA = true; // mark the operation of load sma; loadSMA = true; // mark the operation of load sma;
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -453,7 +453,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int ...@@ -453,7 +453,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows); colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows); colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
} else { // todo opt for json tag } else { // todo opt for json tag
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data, false); colDataAppend(pColInfoData, i, data, false);
} }
...@@ -570,7 +570,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { ...@@ -570,7 +570,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN; pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks due to query func required", GET_TASKID(pTaskInfo)); qDebug(
"%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
"due to query func required",
GET_TASKID(pTaskInfo));
// do prepare for the next round table scan operation // do prepare for the next round table scan operation
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
...@@ -1174,16 +1177,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1174,16 +1177,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
bool isClosed = false; bool isClosed = false;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) { if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
isClosed = isCloseWindow(&win, &pInfo->twAggSup); isClosed = isCloseWindow(&win, &pInfo->twAggSup);
} }
bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
// must check update info first. // must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
if ((update || (isSignleIntervalWindow(pInfo) && isClosed && bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) { isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup);
if ((update || closedWin) && out) {
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
} }
} }
...@@ -1390,8 +1395,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1390,8 +1395,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) { if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader); uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version); updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
return pSDB; return pSDB;
...@@ -1445,7 +1450,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1445,7 +1450,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
setBlockIntoRes(pInfo, &block); setBlockIntoRes(pInfo, &block);
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) { if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
pInfo->pRes->info.version)) {
printDataBlock(pInfo->pRes, "stream scan ignore"); printDataBlock(pInfo->pRes, "stream scan ignore");
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
continue; continue;
...@@ -2248,7 +2254,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -2248,7 +2254,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// build message and send to mnode to fetch the content of system tables. // build message and send to mnode to fetch the content of system tables.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
char dbName[TSDB_DB_NAME_LEN] = {0}; char dbName[TSDB_DB_NAME_LEN] = {0};
const char* name = tNameGetTableName(&pInfo->name); const char* name = tNameGetTableName(&pInfo->name);
if (pInfo->showRewrite) { if (pInfo->showRewrite) {
...@@ -2260,8 +2266,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -2260,8 +2266,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return sysTableScanUserTables(pOperator); return sysTableScanUserTables(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
return sysTableScanUserTags(pOperator); return sysTableScanUserTags(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite &&
pInfo->showRewrite && IS_SYS_DBNAME(dbName)) { IS_SYS_DBNAME(dbName)) {
return sysTableScanUserSTables(pOperator); return sysTableScanUserSTables(pOperator);
} else { // load the meta from mnode of the given epset } else { // load the meta from mnode of the given epset
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -2541,7 +2547,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2541,7 +2547,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -2597,7 +2603,6 @@ _error: ...@@ -2597,7 +2603,6 @@ _error:
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) { const char* idStr) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
...@@ -2606,7 +2611,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags ...@@ -2606,7 +2611,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
} }
int64_t st1 = taosGetTimestampUs(); int64_t st1 = taosGetTimestampUs();
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1-st)/1000.0, idStr); qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
qDebug("no table qualified for query, %s" PRIx64, idStr); qDebug("no table qualified for query, %s" PRIx64, idStr);
...@@ -2620,7 +2625,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags ...@@ -2620,7 +2625,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
} }
int64_t st2 = taosGetTimestampUs(); int64_t st2 = taosGetTimestampUs();
qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2-st1)/1000.0, idStr); qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -1399,7 +1399,7 @@ static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFu ...@@ -1399,7 +1399,7 @@ static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFu
"%s function must be used in select statements", pFunc->functionName); "%s function must be used in select statements", pFunc->functionName);
} }
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) && if (NULL != pSelect->pFromTable && QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
!isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) { !isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s function requires valid time series input", pFunc->functionName); "%s function requires valid time series input", pFunc->functionName);
...@@ -2037,16 +2037,13 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, ...@@ -2037,16 +2037,13 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
code = getDBVgInfoImpl(pCxt, pName, &vgroupList); code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
} }
if (TSDB_CODE_SUCCESS == code && if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) && isSelectStmt(pCxt->pCurrStmt) &&
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
isSelectStmt(pCxt->pCurrStmt) &&
0 == taosArrayGetSize(vgroupList)) { 0 == taosArrayGetSize(vgroupList)) {
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true; ((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
} }
if (TSDB_CODE_SUCCESS == code && if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) { 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList); code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
} }
......
...@@ -13,33 +13,31 @@ ...@@ -13,33 +13,31 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tstreamUpdate.h" #include "query.h"
#include "tencode.h" #include "tencode.h"
#include "tstreamUpdate.h"
#include "ttime.h" #include "ttime.h"
#include "query.h"
#define DEFAULT_FALSE_POSITIVE 0.01 #define DEFAULT_FALSE_POSITIVE 0.01
#define DEFAULT_BUCKET_SIZE 1310720 #define DEFAULT_BUCKET_SIZE 1310720
#define DEFAULT_MAP_CAPACITY 1310720 #define DEFAULT_MAP_CAPACITY 1310720
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10) #define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10)
#define ROWS_PER_MILLISECOND 1 #define ROWS_PER_MILLISECOND 1
#define MAX_NUM_SCALABLE_BF 100000 #define MAX_NUM_SCALABLE_BF 100000
#define MIN_NUM_SCALABLE_BF 10 #define MIN_NUM_SCALABLE_BF 10
#define DEFAULT_PREADD_BUCKET 1 #define DEFAULT_PREADD_BUCKET 1
#define MAX_INTERVAL MILLISECOND_PER_MINUTE #define MAX_INTERVAL MILLISECOND_PER_MINUTE
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) #define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
#define DEFAULT_EXPECTED_ENTRIES 10000 #define DEFAULT_EXPECTED_ENTRIES 10000
static int64_t adjustExpEntries(int64_t entries) { static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
return TMIN(DEFAULT_EXPECTED_ENTRIES, entries);
}
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
if (pInfo->numSBFs < count) { if (pInfo->numSBFs < count) {
count = pInfo->numSBFs; count = pInfo->numSBFs;
} }
for (uint64_t i = 0; i < count; ++i) { for (uint64_t i = 0; i < count; ++i) {
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE); SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &tsSBF); taosArrayPush(pInfo->pTsSBFs, &tsSBF);
} }
...@@ -78,7 +76,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { ...@@ -78,7 +76,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) { static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
if (watermark <= adjInterval) { if (watermark <= adjInterval) {
watermark = TMAX(originInt/adjInterval, 1) * adjInterval; watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
watermark = MAX_NUM_SCALABLE_BF * adjInterval; watermark = MAX_NUM_SCALABLE_BF * adjInterval;
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) { }/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
...@@ -158,11 +156,17 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) { ...@@ -158,11 +156,17 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
return res; return res;
} }
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
void *pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
if (pVal || taosHashGetSize(pInfo->pMap) >= DEFAULT_MAP_SIZE) return true;
return false;
}
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
int32_t res = TSDB_CODE_FAILED; int32_t res = TSDB_CODE_FAILED;
TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) { if (ts < maxTs - pInfo->watermark) {
// this window has been closed. // this window has been closed.
if (pInfo->pCloseWinSBF) { if (pInfo->pCloseWinSBF) {
...@@ -178,42 +182,47 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { ...@@ -178,42 +182,47 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
} }
int32_t size = taosHashGetSize(pInfo->pMap); int32_t size = taosHashGetSize(pInfo->pMap);
if ( (!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) { if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY)); taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
return false; return false;
} }
if ( !pMapMaxTs && maxTs < ts ) { if (!pMapMaxTs && maxTs < ts) {
taosArraySet(pInfo->pTsBuckets, index, &ts); taosArraySet(pInfo->pTsBuckets, index, &ts);
return false; return false;
} }
if (ts < pInfo->minTS) { if (ts < pInfo->minTS) {
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
maxTs, *pMapMaxTs, ts);
return true; return true;
} else if (res == TSDB_CODE_SUCCESS) { } else if (res == TSDB_CODE_SUCCESS) {
return false; return false;
} }
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
maxTs, *pMapMaxTs, ts);
// check from tsdb api // check from tsdb api
return true; return true;
} }
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
pWin->skey, pWin->ekey, version);
pInfo->scanWindow = *pWin; pInfo->scanWindow = *pWin;
pInfo->scanGroupId = groupId; pInfo->scanGroupId = groupId;
pInfo->maxVersion = version; pInfo->maxVersion = version;
} }
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
if (!pInfo) { if (!pInfo) {
return false; return false;
} }
qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->skey, pWin->ekey, version);
pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) { if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey &&
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); version <= pInfo->maxVersion) {
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
pWin->skey, pWin->ekey, version);
return true; return true;
} }
return false; return false;
...@@ -261,7 +270,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) ...@@ -261,7 +270,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
int32_t size = taosArrayGetSize(pInfo->pTsBuckets); int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
if (tEncodeI32(&encoder, size) < 0) return -1; if (tEncodeI32(&encoder, size) < 0) return -1;
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i); TSKEY *pTs = (TSKEY *)taosArrayGet(pInfo->pTsBuckets, i);
if (tEncodeI64(&encoder, *pTs) < 0) return -1; if (tEncodeI64(&encoder, *pTs) < 0) return -1;
} }
...@@ -270,7 +279,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) ...@@ -270,7 +279,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs); int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
if (tEncodeI32(&encoder, sBfSize) < 0) return -1; if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
for (int32_t i = 0; i < sBfSize; i++) { for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i); SScalableBf *pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
if (tScalableBfEncode(pSBf, &encoder) < 0) return -1; if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
} }
...@@ -278,17 +287,17 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) ...@@ -278,17 +287,17 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1; if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1; if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1; if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1; if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;
int32_t mapSize = taosHashGetSize(pInfo->pMap); int32_t mapSize = taosHashGetSize(pInfo->pMap);
if (tEncodeI32(&encoder, mapSize) < 0) return -1; if (tEncodeI32(&encoder, mapSize) < 0) return -1;
void* pIte = NULL; void *pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen); void *key = taosHashGetKey(pIte, &keyLen);
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1; if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1;
if (tEncodeI64(&encoder, *(TSKEY*)pIte) < 0) return -1; if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1;
} }
if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1; if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1;
...@@ -311,7 +320,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { ...@@ -311,7 +320,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
int32_t size = 0; int32_t size = 0;
if (tDecodeI32(&decoder, &size) < 0) return -1; if (tDecodeI32(&decoder, &size) < 0) return -1;
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
TSKEY ts = INT64_MIN; TSKEY ts = INT64_MIN;
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
if (tDecodeI64(&decoder, &ts) < 0) return -1; if (tDecodeI64(&decoder, &ts) < 0) return -1;
...@@ -324,7 +333,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { ...@@ -324,7 +333,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1; if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *)); pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *));
for (int32_t i = 0; i < sBfSize; i++) { for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = tScalableBfDecode(&decoder); SScalableBf *pSBf = tScalableBfDecode(&decoder);
if (!pSBf) return -1; if (!pSBf) return -1;
taosArrayPush(pInfo->pTsSBFs, &pSBf); taosArrayPush(pInfo->pTsSBFs, &pSBf);
} }
...@@ -337,11 +346,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { ...@@ -337,11 +346,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
int32_t mapSize = 0; int32_t mapSize = 0;
if (tDecodeI32(&decoder, &mapSize) < 0) return -1; if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK); pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
uint64_t uid = 0; uint64_t uid = 0;
ts = INT64_MIN; ts = INT64_MIN;
for(int32_t i = 0; i < mapSize; i++) { for (int32_t i = 0; i < mapSize; i++) {
if (tDecodeU64(&decoder, &uid) < 0) return -1; if (tDecodeU64(&decoder, &uid) < 0) return -1;
if (tDecodeI64(&decoder, &ts) < 0) return -1; if (tDecodeI64(&decoder, &ts) < 0) return -1;
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY)); taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
......
...@@ -392,6 +392,29 @@ bool syncIsReady(int64_t rid) { ...@@ -392,6 +392,29 @@ bool syncIsReady(int64_t rid) {
return b; return b;
} }
bool syncIsReadyForRead(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
ASSERT(rid == pSyncNode->rid);
// TODO: last not noop?
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
// if false, set error code
if (false == b) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
}
return b;
}
bool syncIsRestoreFinish(int64_t rid) { bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
...@@ -519,6 +542,30 @@ SyncTerm syncGetMyTerm(int64_t rid) { ...@@ -519,6 +542,30 @@ SyncTerm syncGetMyTerm(int64_t rid) {
return term; return term;
} }
SyncIndex syncGetLastIndex(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID;
}
ASSERT(rid == pSyncNode->rid);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return lastIndex;
}
SyncIndex syncGetCommitIndex(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID;
}
ASSERT(rid == pSyncNode->rid);
SyncIndex cmtIndex = pSyncNode->commitIndex;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return cmtIndex;
}
SyncGroupId syncGetVgId(int64_t rid) { SyncGroupId syncGetVgId(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
...@@ -828,6 +875,15 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -828,6 +875,15 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
pSyncNode->changing = true; pSyncNode->changing = true;
} }
// not restored, vnode enable
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
ret = -1;
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%ld, cmt:%ld", pSyncNode->vgId,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
goto _END;
}
SRespStub stub; SRespStub stub;
stub.createTime = taosGetTimestampMs(); stub.createTime = taosGetTimestampMs();
stub.rpcMsg = *pMsg; stub.rpcMsg = *pMsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册