提交 313a546b 编写于 作者: C Cary Xu

other: merge 3.0

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 53a0103 GIT_TAG d237772
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}} {{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
``` ```
:::note ```java
For now Java connector doesn't provide asynchronous subscription, but `TimerTask` can be used to achieve similar purpose. {{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
```
::: ```java
\ No newline at end of file {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
\ No newline at end of file
```rs ```rust
{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}} {{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}}
``` ```
...@@ -41,19 +41,20 @@ Please refer to [Version Support List](/reference/connector#version-support). ...@@ -41,19 +41,20 @@ Please refer to [Version Support List](/reference/connector#version-support).
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Java is as follows: TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Java is as follows:
| TDengine DataType | JDBCType (driver version < 2.0.24) | JDBCType (driver version > = 2.0.24) |
| ----------------- | ---------------------------------- | ------------------------------------ | | TDengine DataType | JDBCType |
| TIMESTAMP | java.lang.Long | java.sql.Timestamp | | ----------------- | ---------------------------------- |
| INT | java.lang.Integer | java.lang.Integer | | TIMESTAMP | java.sql.Timestamp |
| BIGINT | java.lang.Long | java.lang.Long | | INT | java.lang.Integer |
| FLOAT | java.lang.Float | java.lang.Float | | BIGINT | java.lang.Long |
| DOUBLE | java.lang.Double | java.lang.Double | | FLOAT | java.lang.Float |
| SMALLINT | java.lang.Short | java.lang.Short | | DOUBLE | java.lang.Double |
| TINYINT | java.lang.Byte | java.lang.Byte | | SMALLINT | java.lang.Short |
| BOOL | java.lang.Boolean | java.lang.Boolean | | TINYINT | java.lang.Byte |
| BINARY | java.lang.String | byte array | | BOOL | java.lang.Boolean |
| NCHAR | java.lang.String | java.lang.String | | BINARY | byte array |
| JSON | - | java.lang.String | | NCHAR | java.lang.String |
| JSON | java.lang.String |
**Note**: Only TAG supports JSON types **Note**: Only TAG supports JSON types
...@@ -81,7 +82,7 @@ Add following dependency in the `pom.xml` file of your Maven project: ...@@ -81,7 +82,7 @@ Add following dependency in the `pom.xml` file of your Maven project:
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.**</version> <version>3.0.0</version>
</dependency> </dependency>
``` ```
...@@ -845,7 +846,13 @@ Please refer to: [JDBC example](https://github.com/taosdata/TDengine/tree/develo ...@@ -845,7 +846,13 @@ Please refer to: [JDBC example](https://github.com/taosdata/TDengine/tree/develo
**Cause**: Currently, TDengine only supports 64-bit JDK. **Cause**: Currently, TDengine only supports 64-bit JDK.
**Solution**: Reinstall the 64-bit JDK. 4. **Solution**: Reinstall the 64-bit JDK.
4. java.lang.NoSuchMethodError: setByteArray
**Cause**: taos-jdbcdriver version 3.* only supports TDengine 3.0 or above.
**Solution**: connect TDengine 2.* using taos-jdbcdriver 2.* version.
For other questions, please refer to [FAQ](/train-faq/faq) For other questions, please refer to [FAQ](/train-faq/faq)
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.38</version> <version>3.0.0</version>
</dependency> </dependency>
<!-- ANCHOR_END: dep--> <!-- ANCHOR_END: dep-->
<dependency> <dependency>
......
package com.taos.example;
import java.sql.Timestamp;
public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public float getCurrent() {
return current;
}
public void setCurrent(float current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
@Override
public String toString() {
return "Meters{" +
"ts=" + ts +
", current=" + current +
", voltage=" + voltage +
", groupid=" + groupid +
", location='" + location + '\'' +
'}';
}
}
package com.taos.example;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
\ No newline at end of file
package com.taos.example; package com.taos.example;
import com.taosdata.jdbc.TSDBConnection; import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.TSDBDriver; import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.TSDBResultSet; import com.taosdata.jdbc.tmq.TaosConsumer;
import com.taosdata.jdbc.TSDBSubscribe;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
public class SubscribeDemo { public class SubscribeDemo {
private static final String topic = "topic-meter-current-bg-10"; private static final String TOPIC = "tmq_topic";
private static final String sql = "select * from meters where current > 10"; private static final String DB_NAME = "meters";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);
public static void main(String[] args) { public static void main(String[] args) {
Connection connection = null; Timer timer = new Timer();
TSDBSubscribe subscribe = null; timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try { try {
// prepare
Class.forName("com.taosdata.jdbc.TSDBDriver"); Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties(); String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); Connection connection = DriverManager.getConnection(jdbcUrl);
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); try (Statement statement = connection.createStatement()) {
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/power?user=root&password=taosdata"; statement.executeUpdate("drop topic if exists " + TOPIC);
connection = DriverManager.getConnection(jdbcUrl, properties); statement.executeUpdate("drop database if exists " + DB_NAME);
// create subscribe statement.executeUpdate("create database " + DB_NAME);
subscribe = ((TSDBConnection) connection).subscribe(topic, sql, true); statement.executeUpdate("use " + DB_NAME);
int count = 0; statement.executeUpdate(
while (count < 10) { "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(16))");
// wait 1 second to avoid frequent calls to consume statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')");
TimeUnit.SECONDS.sleep(1); statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
// consume statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
TSDBResultSet resultSet = subscribe.consume(); statement.executeUpdate(
if (resultSet == null) { "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119)");
continue; statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
} }
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) { // create consumer
int columnCount = metaData.getColumnCount(); Properties properties = new Properties();
for (int i = 1; i <= columnCount; i++) { properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
System.out.print(metaData.getColumnLabel(i) + ": " + resultSet.getString(i) + "\t"); properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.GROUP_ID, "test");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taosdata.jdbc.MetersDeserializer");
// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
System.out.println(meter);
} }
System.out.println();
count++;
} }
} }
} catch (Exception e) { } catch (ClassNotFoundException | SQLException e) {
e.printStackTrace(); e.printStackTrace();
} finally {
try {
if (null != subscribe)
// close subscribe
subscribe.close(true);
if (connection != null)
connection.close();
} catch (SQLException throwable) {
throwable.printStackTrace();
}
} }
timer.cancel();
} }
} }
\ No newline at end of file
...@@ -4,7 +4,7 @@ sidebar_label: 文档首页 ...@@ -4,7 +4,7 @@ sidebar_label: 文档首页
slug: / slug: /
--- ---
TDengine是一款[开源](https://www.taosdata.com/tdengine/open_source_time-series_database)[高性能](https://www.taosdata.com/fast)[云原生](https://www.taosdata.com/tdengine/cloud_native_time-series_database)时序数据库(Time-Series Database, TSDB), 它专为物联网、工业互联网、金融等场景优化设计。同时它还带有内建的缓存、流式计算、数据订阅等系统功能,能大幅减少系统设计的复杂度,降低研发和运营成本,是一极简的时序数据处理平台。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。 TDengine是一款[开源](https://www.taosdata.com/tdengine/open_source_time-series_database)[高性能](https://www.taosdata.com/fast)[云原生](https://www.taosdata.com/tdengine/cloud_native_time-series_database)<a href="https://www.taosdata.com/" data-internallinksmanager029f6b8e52c="2" title="时序数据库" target="_blank" rel="noopener">时序数据库</a><a href="https://www.taosdata.com/time-series-database" data-internallinksmanager029f6b8e52c="9" title="Time Series DataBase" target="_blank" rel="noopener">Time Series Database</a>, <a href="https://www.taosdata.com/tsdb" data-internallinksmanager029f6b8e52c="8" title="TSDB" target="_blank" rel="noopener">TSDB</a>, 它专为物联网、工业互联网、金融等场景优化设计。同时它还带有内建的缓存、流式计算、数据订阅等系统功能,能大幅减少系统设计的复杂度,降低研发和运营成本,是一极简的时序数据处理平台。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。
TDengine 充分利用了时序数据的特点,提出了“一个数据采集点一张表”与“超级表”的概念,设计了创新的存储引擎,让数据的写入、查询和存储效率都得到极大的提升。为正确理解并使用TDengine, 无论如何,请您仔细阅读[基本概念](./concept)一章。 TDengine 充分利用了时序数据的特点,提出了“一个数据采集点一张表”与“超级表”的概念,设计了创新的存储引擎,让数据的写入、查询和存储效率都得到极大的提升。为正确理解并使用TDengine, 无论如何,请您仔细阅读[基本概念](./concept)一章。
......
...@@ -3,7 +3,7 @@ title: 产品简介 ...@@ -3,7 +3,7 @@ title: 产品简介
toc_max_heading_level: 2 toc_max_heading_level: 2
--- ---
TDengine 是一款[开源](https://www.taosdata.com/tdengine/open_source_time-series_database)[高性能](https://www.taosdata.com/tdengine/fast)[云原生](https://www.taosdata.com/tdengine/cloud_native_time-series_database)时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供[缓存](../develop/cache/)[数据订阅](../develop/tmq)[流式计算](../develop/stream)等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。 TDengine 是一款[开源](https://www.taosdata.com/tdengine/open_source_time-series_database)[高性能](https://www.taosdata.com/tdengine/fast)[云原生](https://www.taosdata.com/tdengine/cloud_native_time-series_database)<a href="https://www.taosdata.com/" data-internallinksmanager029f6b8e52c="2" title="时序数据库" target="_blank" rel="noopener">时序数据库</a><a href="https://www.taosdata.com/time-series-database" data-internallinksmanager029f6b8e52c="9" title="Time Series DataBase" target="_blank" rel="noopener">Time Series Database</a>, <a href="https://www.taosdata.com/tsdb" data-internallinksmanager029f6b8e52c="8" title="TSDB" target="_blank" rel="noopener">TSDB</a>。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供[缓存](../develop/cache/)[数据订阅](../develop/tmq)[流式计算](../develop/stream)等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。
本章节介绍TDengine的主要功能、竞争优势、适用场景、与其他数据库的对比测试等等,让大家对TDengine有个整体的了解。 本章节介绍TDengine的主要功能、竞争优势、适用场景、与其他数据库的对比测试等等,让大家对TDengine有个整体的了解。
......
...@@ -13,7 +13,7 @@ title: 通过 Docker 快速体验 TDengine ...@@ -13,7 +13,7 @@ title: 通过 Docker 快速体验 TDengine
如果已经安装了 docker, 只需执行下面的命令。 如果已经安装了 docker, 只需执行下面的命令。
```shell ```shell
docker run -d -p 6030:6030 -p 6041/6041 -p 6043-6049/6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine docker run -d -p 6030:6030 -p 6041:6041 -p 6043-6049:6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine
``` ```
注意:TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。 注意:TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。
......
...@@ -29,6 +29,7 @@ echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | ...@@ -29,6 +29,7 @@ echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" |
如果安装 Beta 版需要安装包仓库 如果安装 Beta 版需要安装包仓库
```bash ```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
``` ```
......
...@@ -4,8 +4,16 @@ description: "TDengine 流式计算将数据的写入、预处理、复杂分析 ...@@ -4,8 +4,16 @@ description: "TDengine 流式计算将数据的写入、预处理、复杂分析
title: 流式计算 title: 流式计算
--- ---
在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。用户通常需要在时序数据库之外再搭建 Kafka、Flink、Spark 等流计算处理引擎,增加了用户的开发成本和维护成本。 在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。在传统的时序数据解决方案中,常常需要部署 Kafka、Flink 等流处理系统。而流处理系统的复杂性,带来了高昂的开发与运维成本。
使用 TDengine 3.0 的流式计算引擎能够最大限度的减少对这些额外中间件的依赖,真正将数据的写入、预处理、长期存储、复杂分析、实时计算、实时报警触发等功能融为一体,并且,所有这些任务只需要使用 SQL 完成,极大降低了用户的学习成本、使用成本。
TDengine 3.0 的流式计算引擎提供了实时处理写入的数据流的能力,使用 SQL 定义实时流变换,当数据被写入流的源表后,数据会被以定义的方式自动处理,并根据定义的触发模式向目的表推送结果。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟。
流式计算可以包含数据过滤,标量函数计算(含UDF),以及窗口聚合(支持滑动窗口、会话窗口与状态窗口),可以以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition,不同的 partition 将写入到目的超级表的不同子表。
TDengine 的流式计算能够支持分布在多个 vnode 中的超级表聚合;还能够处理乱序数据的写入:它提供了 watermark 机制以度量容忍数据乱序的程度,并提供了 ignore expired 配置项以决定乱序数据的处理策略——丢弃或者重新计算。
详见 [流式计算](../../taos-sql/stream)
## 流式计算的创建 ## 流式计算的创建
...@@ -14,7 +22,7 @@ CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subq ...@@ -14,7 +22,7 @@ CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subq
stream_options: { stream_options: {
TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time] TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
WATERMARK time WATERMARK time
IGNORE EXPIRED IGNORE EXPIRED [0 | 1]
} }
``` ```
...@@ -59,7 +67,7 @@ insert into d1004 values("2018-10-03 14:38:05.000", 10.80000, 223, 0.29000); ...@@ -59,7 +67,7 @@ insert into d1004 values("2018-10-03 14:38:05.000", 10.80000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000); insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000);
``` ```
### 查询以观结果 ### 查询以观结果
```sql ```sql
taos> select start, end, max_current from current_stream_output_stb; taos> select start, end, max_current from current_stream_output_stb;
...@@ -88,7 +96,7 @@ create stream power_stream into power_stream_output_stb as select ts, concat_ws( ...@@ -88,7 +96,7 @@ create stream power_stream into power_stream_output_stb as select ts, concat_ws(
参考示例一 [写入数据](#写入数据) 参考示例一 [写入数据](#写入数据)
### 查询以观结果 ### 查询以观结果
```sql ```sql
taos> select ts, meter_location, active_power, reactive_power from power_stream_output_stb; taos> select ts, meter_location, active_power, reactive_power from power_stream_output_stb;
ts | meter_location | active_power | reactive_power | ts | meter_location | active_power | reactive_power |
......
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}} {{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
``` ```
:::note ```java
目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 `TimerTask` 等方式达到同样的效果。 {{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
```
::: ```java
\ No newline at end of file {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
\ No newline at end of file
```rs ```rust
{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}} {{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}}
``` ```
...@@ -34,7 +34,7 @@ CREATE DATABASE db_name PRECISION 'ns'; ...@@ -34,7 +34,7 @@ CREATE DATABASE db_name PRECISION 'ns';
| 7 | DOUBLE | 8 | 双精度浮点型,有效位数 15-16,范围 [-1.7E308, 1.7E308] | | 7 | DOUBLE | 8 | 双精度浮点型,有效位数 15-16,范围 [-1.7E308, 1.7E308] |
| 8 | BINARY | 自定义 | 记录单字节字符串,建议只用于处理 ASCII 可见字符,中文等多字节字符需使用 nchar。 | | 8 | BINARY | 自定义 | 记录单字节字符串,建议只用于处理 ASCII 可见字符,中文等多字节字符需使用 nchar。 |
| 9 | SMALLINT | 2 | 短整型, 范围 [-32768, 32767] | | 9 | SMALLINT | 2 | 短整型, 范围 [-32768, 32767] |
| 10 | SMALLINT UNSIGNED | 2| 无符号短整型,范围 [0, 655357] | | 10 | SMALLINT UNSIGNED | 2| 无符号短整型,范围 [0, 65535] |
| 11 | TINYINT | 1 | 单字节整型,范围 [-128, 127] | | 11 | TINYINT | 1 | 单字节整型,范围 [-128, 127] |
| 12 | TINYINT UNSIGNED | 1 | 无符号单字节整型,范围 [0, 255] | | 12 | TINYINT UNSIGNED | 1 | 无符号单字节整型,范围 [0, 255] |
| 13 | BOOL | 1 | 布尔型,{true, false} | | 13 | BOOL | 1 | 布尔型,{true, false} |
......
--- ---
sidebar_label: 消息队列 sidebar_label: 数据订阅
title: 消息队列 title: 数据订阅
--- ---
TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。 TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。
...@@ -8,24 +8,17 @@ TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用 ...@@ -8,24 +8,17 @@ TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用
## 创建订阅主题 ## 创建订阅主题
```sql ```sql
CREATE TOPIC [IF NOT EXISTS] topic_name AS {subquery | DATABASE db_name | STABLE stb_name }; CREATE TOPIC [IF NOT EXISTS] topic_name AS subquery;
``` ```
订阅主题包括三种:列订阅、超级表订阅和数据库订阅。
**列订阅是**用 subquery 描述,支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下: TOPIC 支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下:
1. TOPIC 一旦创建则返回结果的字段确定 1. TOPIC 一旦创建则返回结果的字段确定
2. 被订阅或用于计算的列不可被删除、修改 2. 被订阅或用于计算的列不可被删除、修改
3. 列可以新增,但新增的列不出现在订阅结果字段中 3. 列可以新增,但新增的列不出现在订阅结果字段中
4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列) 4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
**超级表订阅和数据库订阅**规则如下:
1. 被订阅主体的 schema 变更不受限
2. 返回消息中 schema 是块级别的,每块的 schema 可能不一样
3. 列变更后写入的数据若未落盘,将以写入时的 schema 返回
4. 列变更后写入的数据若未已落盘,将以落盘时的 schema 返回
## 删除订阅主题 ## 删除订阅主题
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
title: REST API title: REST API
--- ---
为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API,即 REST API。为最大程度降低学习成本,不同于其他数据库 REST API 的设计方法,TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST 连接器的使用参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。 为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API,即 REST API。为最大程度降低学习成本,不同于其他数据库 REST API 的设计方法,TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST 连接器的使用参见 [视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。
:::note :::note
与原生连接器的一个区别是,RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name,这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。 与原生连接器的一个区别是,RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name,这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。
...@@ -20,8 +20,10 @@ RESTful 接口不依赖于任何 TDengine 的库,因此客户端不需要安 ...@@ -20,8 +20,10 @@ RESTful 接口不依赖于任何 TDengine 的库,因此客户端不需要安
下面示例是列出所有的数据库,请把 h1.taosdata.com 和 6041(缺省值)替换为实际运行的 TDengine 服务 FQDN 和端口号: 下面示例是列出所有的数据库,请把 h1.taosdata.com 和 6041(缺省值)替换为实际运行的 TDengine 服务 FQDN 和端口号:
```html ```bash
curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.taosdata.com:6041/rest/sql curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" \
-d "select name, ntables, status from information_schema.ins_databases;" \
h1.taosdata.com:6041/rest/sql
``` ```
返回值结果如下表示验证通过: 返回值结果如下表示验证通过:
...@@ -35,188 +37,27 @@ curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.t ...@@ -35,188 +37,27 @@ curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.t
"VARCHAR", "VARCHAR",
64 64
], ],
[
"create_time",
"TIMESTAMP",
8
],
[
"vgroups",
"SMALLINT",
2
],
[ [
"ntables", "ntables",
"BIGINT", "BIGINT",
8 8
], ],
[
"replica",
"TINYINT",
1
],
[
"strict",
"VARCHAR",
4
],
[
"duration",
"VARCHAR",
10
],
[
"keep",
"VARCHAR",
32
],
[
"buffer",
"INT",
4
],
[
"pagesize",
"INT",
4
],
[
"pages",
"INT",
4
],
[
"minrows",
"INT",
4
],
[
"maxrows",
"INT",
4
],
[
"comp",
"TINYINT",
1
],
[
"precision",
"VARCHAR",
2
],
[ [
"status", "status",
"VARCHAR", "VARCHAR",
10 10
],
[
"retention",
"VARCHAR",
60
],
[
"single_stable",
"BOOL",
1
],
[
"cachemodel",
"VARCHAR",
11
],
[
"cachesize",
"INT",
4
],
[
"wal_level",
"TINYINT",
1
],
[
"wal_fsync_period",
"INT",
4
],
[
"wal_retention_period",
"INT",
4
],
[
"wal_retention_size",
"BIGINT",
8
],
[
"wal_roll_period",
"INT",
4
],
[
"wal_seg_size",
"BIGINT",
8
] ]
], ],
"data": [ "data": [
[ [
"information_schema", "information_schema",
null, 16,
null, "ready"
14,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
"ready",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
], ],
[ [
"performance_schema", "performance_schema",
null, 9,
null, "ready"
3,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
"ready",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
] ]
], ],
"rows": 2 "rows": 2
...@@ -231,21 +72,21 @@ http://<fqdn>:<port>/rest/sql/[db_name] ...@@ -231,21 +72,21 @@ http://<fqdn>:<port>/rest/sql/[db_name]
参数说明: 参数说明:
- fqnd: 集群中的任一台主机 FQDN 或 IP 地址 - fqnd: 集群中的任一台主机 FQDN 或 IP 地址
- port: 配置文件中 httpPort 配置项,缺省为 6041 - port: 配置文件中 httpPort 配置项,缺省为 6041
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。 - db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。
例如:`http://h1.taos.com:6041/rest/sql/test` 是指向地址为 `h1.taos.com:6041` 的 URL,并将默认使用的数据库库名设置为 `test`。 例如:`http://h1.taos.com:6041/rest/sql/test` 是指向地址为 `h1.taos.com:6041` 的 URL,并将默认使用的数据库库名设置为 `test`。
HTTP 请求的 Header 里需带有身份认证信息,TDengine 支持 Basic 认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。 HTTP 请求的 Header 里需带有身份认证信息,TDengine 支持 Basic 认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。
- [自定义身份认证信息](#自定义授权码)如下所示 - [自定义身份认证信息](#自定义授权码)如下所示
```text ```text
Authorization: Taosd <TOKEN> Authorization: Taosd <TOKEN>
``` ```
- Basic 身份认证信息如下所示 - Basic 身份认证信息如下所示
```text ```text
Authorization: Basic <TOKEN> Authorization: Basic <TOKEN>
...@@ -259,13 +100,13 @@ HTTP 请求的 BODY 里就是一个完整的 SQL 语句,SQL 语句中的数据 ...@@ -259,13 +100,13 @@ HTTP 请求的 BODY 里就是一个完整的 SQL 语句,SQL 语句中的数据
curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
``` ```
或者 或者
```bash ```bash
curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
``` ```
其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==` 其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==`
## HTTP 返回格式 ## HTTP 返回格式
...@@ -282,27 +123,9 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] ...@@ -282,27 +123,9 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
### HTTP body 结构 ### HTTP body 结构
<table> #### 正确执行
<tr>
<th>执行结果</th> 样例:
<th>说明</th>
<th>样例</th>
</tr>
<tr>
<td>正确执行</td>
<td>
code:(int)0 代表成功
<br/>
<br/>
column_meta:([][3]any)列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
<br/>
<br/>
rows:(int)数据返回行数
<br/>
<br/>
data:([][]any)具体数据内容
</td>
<td>
```json ```json
{ {
...@@ -313,23 +136,16 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] ...@@ -313,23 +136,16 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
} }
``` ```
</td> 说明:
</tr>
<tr> - code:(`int`)0 代表成功。
<td>正确查询</td> - column_meta:(`[1][3]any`)只返回 `[["affected_rows", "INT", 4]]`。
<td> - rows:(`int`)只返回 `1`。
code:(int)0 代表成功 - data:(`[][]any`)返回受影响行数。
<br/>
<br/> #### 正确查询
column_meta:([][3]any) 列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
<br/> 样例:
<br/>
rows:(int)数据返回行数
<br/>
<br/>
data:([][]any)具体数据内容
</td>
<td>
```json ```json
{ {
...@@ -385,17 +201,35 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] ...@@ -385,17 +201,35 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
} }
``` ```
</td> 说明:
</tr>
<tr> - code:(`int`)0 代表成功。
<td>错误</td> - column_meta:(`[][3]any`) 列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)。
<td> - rows:(`int`)数据返回行数。
code:(int)错误码 - data:(`[][]any`)具体数据内容(时间格式仅支持 RFC3339,结果集为 0 时区)。
<br/>
<br/> 列类型使用如下字符串:
desc:(string)错误描述
</td> - "NULL"
<td> - "BOOL"
- "TINYINT"
- "SMALLINT"
- "INT"
- "BIGINT"
- "FLOAT"
- "DOUBLE"
- "VARCHAR"
- "TIMESTAMP"
- "NCHAR"
- "TINYINT UNSIGNED"
- "SMALLINT UNSIGNED"
- "INT UNSIGNED"
- "BIGINT UNSIGNED"
- "JSON"
#### 错误
样例:
```json ```json
{ {
...@@ -404,30 +238,10 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] ...@@ -404,30 +238,10 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
} }
``` ```
</td> 说明:
</tr>
</table> - code:(`int`)错误码。
- desc:(`string`)错误描述。
### 说明
- 时间格式仅支持 RFC3339,结果集为 0 时区
- 列类型使用如下字符串:
> "NULL"
> "BOOL"
> "TINYINT"
> "SMALLINT"
> "INT"
> "BIGINT"
> "FLOAT"
> "DOUBLE"
> "VARCHAR"
> "TIMESTAMP"
> "NCHAR"
> "TINYINT UNSIGNED"
> "SMALLINT UNSIGNED"
> "INT UNSIGNED"
> "BIGINT UNSIGNED"
> "JSON"
## 自定义授权码 ## 自定义授权码
...@@ -439,11 +253,9 @@ curl http://<fqnd>:<port>/rest/login/<username>/<password> ...@@ -439,11 +253,9 @@ curl http://<fqnd>:<port>/rest/login/<username>/<password>
其中,`fqdn` 是 TDengine 数据库的 FQDN 或 IP 地址,`port` 是 TDengine 服务的端口号,`username` 为数据库用户名,`password` 为数据库密码,返回值为 JSON 格式,各字段含义如下: 其中,`fqdn` 是 TDengine 数据库的 FQDN 或 IP 地址,`port` 是 TDengine 服务的端口号,`username` 为数据库用户名,`password` 为数据库密码,返回值为 JSON 格式,各字段含义如下:
- status:请求结果的标志位 - status:请求结果的标志位。
- code:返回值代码。
- code:返回值代码 - desc:授权码。
- desc:授权码
获取授权码示例: 获取授权码示例:
......
...@@ -404,47 +404,3 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多 ...@@ -404,47 +404,3 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**支持版本** **支持版本**
该功能接口从 2.3.0.0 版本开始支持。 该功能接口从 2.3.0.0 版本开始支持。
### 订阅和消费 API
订阅 API 目前支持订阅一张或多张表,并通过定期轮询的方式不断获取写入表中的最新数据。
- `TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)`
该函数负责启动订阅服务,成功时返回订阅对象,失败时返回 `NULL`,其参数为:
- taos:已经建立好的数据库连接
- restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
- topic:订阅的主题(即名称),此参数是订阅的唯一标识
- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
- fp:收到查询结果时的回调函数(稍后介绍函数原型),只在异步调用时使用,同步调用时此参数应该传 `NULL`
- param:调用回调函数时的附加参数,系统 API 将其原样传递到回调函数,不进行任何处理
- interval:轮询周期,单位为毫秒。异步调用时,将根据此参数周期性的调用回调函数,为避免对系统性能造成影响,不建议将此参数设置的过小;同步调用时,如两次调用 `taos_consume()` 的间隔小于此周期,API 将会阻塞,直到时间间隔超过此周期。
- `typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code)`
异步模式下,回调函数的原型,其参数为:
- tsub:订阅对象
- res:查询结果集,注意结果集中可能没有记录
- param:调用 `taos_subscribe()` 时客户程序提供的附加参数
- code:错误码
:::note
在这个回调函数里不可以做耗时过长的处理,尤其是对于返回的结果集中数据较多的情况,否则有可能导致客户端阻塞等异常状态。如果必须进行复杂计算,则建议在另外的线程中进行处理。
:::
- `TAOS_RES *taos_consume(TAOS_SUB *tsub)`
同步模式下,该函数用来获取订阅的结果。 用户应用程序将其置于一个循环之中。 如两次调用 `taos_consume()` 的间隔小于订阅的轮询周期,API 将会阻塞,直到时间间隔超过此周期。如果数据库有新记录到达,该 API 将返回该最新的记录,否则返回一个没有记录的空结果集。 如果返回值为 `NULL`,说明系统出错。 异步模式下,用户程序不应调用此 API。
:::note
在调用 `taos_consume()` 之后,用户应用应确保尽快调用 `taos_fetch_row()` 或 `taos_fetch_block()` 来处理订阅结果,否则服务端会持续缓存查询结果数据等待客户端读取,极端情况下会导致服务端内存消耗殆尽,影响服务稳定性。
:::
- `void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress)`
取消订阅。 如参数 `keepProgress` 不为 0,API 会保留订阅的进度信息,后续调用 `taos_subscribe()` 时可以基于此进度继续;否则将删除进度信息,后续只能重新开始读取数据。
...@@ -83,7 +83,7 @@ Maven 项目中,在 pom.xml 中添加以下依赖: ...@@ -83,7 +83,7 @@ Maven 项目中,在 pom.xml 中添加以下依赖:
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.**</version> <version>3.0.0</version>
</dependency> </dependency>
``` ```
...@@ -712,7 +712,7 @@ while(true) { ...@@ -712,7 +712,7 @@ while(true) {
} }
``` ```
`poll` 方法返回一个结果集,其中包含从上次 `poll` 到目前为止的所有新数据。请务必按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(100)`),否则会给服务端造成不必要的压力 `poll` 每次调用获取一个消息
#### 关闭订阅 #### 关闭订阅
...@@ -900,7 +900,13 @@ public static void main(String[] args) throws Exception { ...@@ -900,7 +900,13 @@ public static void main(String[] args) throws Exception {
**解决方法**:重新安装 64 位 JDK。 **解决方法**:重新安装 64 位 JDK。
4. 其它问题请参考 [FAQ](../../../train-faq/faq) 4. java.lang.NoSuchMethodError: setByteArray
**原因**:taos-jdbcdriver 3.* 版本仅支持 TDengine 3.0 及以上版本。
**解决方法**: 使用 taos-jdbcdriver 2.* 版本连接 TDengine 2.* 版本。
其它问题请参考 [FAQ](../../../train-faq/faq)
## API 参考 ## API 参考
......
...@@ -275,12 +275,8 @@ typedef struct SStreamTask { ...@@ -275,12 +275,8 @@ typedef struct SStreamTask {
int32_t nodeId; int32_t nodeId;
SEpSet epSet; SEpSet epSet;
// used for task source and sink,
// while task agg should have processedVer for each child
int64_t recoverSnapVer; int64_t recoverSnapVer;
int64_t startVer; int64_t startVer;
int64_t checkpointVer;
int64_t processedVer;
// children info // children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*> SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......
...@@ -381,6 +381,9 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { ...@@ -381,6 +381,9 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
} }
#endif #endif
taosMemoryFree(pParam->pOffset);
if (pBuf->pData) taosMemoryFree(pBuf->pData);
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId, /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
* pOffset->version);*/ * pOffset->version);*/
...@@ -402,6 +405,8 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { ...@@ -402,6 +405,8 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
tsem_post(&pParamSet->rspSem); tsem_post(&pParamSet->rspSem);
} }
taosMemoryFree(pParamSet);
#if 0 #if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
...@@ -611,12 +616,12 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t ...@@ -611,12 +616,12 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
} }
} }
#if 0
if (!async) { if (!async) {
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
#endif #endif
}
return 0; return 0;
} }
...@@ -1216,6 +1221,7 @@ END: ...@@ -1216,6 +1221,7 @@ END:
} else { } else {
taosMemoryFree(pParam); taosMemoryFree(pParam);
} }
taosMemoryFree(pMsg->pData);
return code; return code;
} }
......
...@@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode); ...@@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c // vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
...@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode); ...@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -144,6 +144,7 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb ...@@ -144,6 +144,7 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
// tq // tq
int tqInit(); int tqInit();
...@@ -170,9 +171,8 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); ...@@ -170,9 +171,8 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq);
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq);
// sma // sma
int32_t smaInit(); int32_t smaInit();
......
...@@ -298,14 +298,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -298,14 +298,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
terrno = TSDB_CODE_TDB_STB_NOT_EXIST; terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
// ASSERT(0);
return -1; return -1;
} }
ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
if (ret < 0) { if (ret < 0) {
tdbTbcClose(pUidIdxc);
terrno = TSDB_CODE_TDB_STB_NOT_EXIST; terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
// ASSERT(0);
return -1; return -1;
} }
......
...@@ -201,9 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { ...@@ -201,9 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
} }
SBatchDeleteReq deleteReq; SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq = SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true,
tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
if (!pSubmitReq) { if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "tq.h" #include "tq.h"
#include "vnd.h"
#if 0 #if 0
void tqTmrRspFunc(void* param, void* tmrId) { void tqTmrRspFunc(void* param, void* tmrId) {
...@@ -212,9 +213,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ ...@@ -212,9 +213,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif #endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
walApplyVer(pTq->pVnode->pWal, ver); if (vnodeIsRoleLeader(pTq->pVnode) && msgType == TDMT_VND_SUBMIT) {
if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen); void* data = taosMemoryMalloc(msgLen);
......
...@@ -25,8 +25,7 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -25,8 +25,7 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t row = 0; row < totRow; row++) { for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
/*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/ int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
int64_t groupId = 0;
char* name = buildCtbNameByGroupId(stbFullName, groupId); char* name = buildCtbNameByGroupId(stbFullName, groupId);
tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name); tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
SMetaReader mr = {0}; SMetaReader mr = {0};
...@@ -49,8 +48,8 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -49,8 +48,8 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0; return 0;
} }
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) { int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL; SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL; SArray* schemaReqSz = NULL;
...@@ -153,7 +152,7 @@ SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -153,7 +152,7 @@ SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// assign data // assign data
// TODO // TODO
ret = rpcMallocCont(cap); ret = rpcMallocCont(cap);
ret->header.vgId = vgId; ret->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq); ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz); ret->numOfBlocks = htonl(sz);
...@@ -234,8 +233,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -234,8 +233,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq); pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
......
...@@ -247,6 +247,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -247,6 +247,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
walApplyVer(pVnode->pWal, version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
......
...@@ -764,6 +764,8 @@ void vnodeSyncStart(SVnode *pVnode) { ...@@ -764,6 +764,8 @@ void vnodeSyncStart(SVnode *pVnode) {
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }
bool vnodeIsLeader(SVnode *pVnode) { bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) { if (!syncIsReady(pVnode->sync)) {
vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync), vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
......
#include "qworker.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
#include "executor.h" #include "executor.h"
#include "planner.h" #include "planner.h"
...@@ -7,7 +9,6 @@ ...@@ -7,7 +9,6 @@
#include "tcommon.h" #include "tcommon.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
#include "qworker.h"
SQWorkerMgmt gQwMgmt = { SQWorkerMgmt gQwMgmt = {
.lock = 0, .lock = 0,
...@@ -15,7 +16,6 @@ SQWorkerMgmt gQwMgmt = { ...@@ -15,7 +16,6 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0, .qwNum = 0,
}; };
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
...@@ -44,8 +44,8 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re ...@@ -44,8 +44,8 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
static void freeItem(void* param) { static void freeItem(void *param) {
SExplainExecInfo* pInfo = param; SExplainExecInfo *pInfo = param;
taosMemoryFree(pInfo->verboseInfo); taosMemoryFree(pInfo->verboseInfo);
} }
...@@ -54,7 +54,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -54,7 +54,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) { if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
if (ctx->explain) { if (ctx->explain) {
SArray* execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
SRpcHandleInfo connInfo = ctx->ctrlConnInfo; SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
...@@ -81,7 +81,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -81,7 +81,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
qTaskInfo_t taskHandle = ctx->taskHandle; qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle; DataSinkHandle sinkHandle = ctx->sinkHandle;
SArray* pResList = taosArrayInit(4, POINTER_BYTES); SArray *pResList = taosArrayInit(4, POINTER_BYTES);
while (true) { while (true) {
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
...@@ -95,7 +95,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -95,7 +95,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
} else { } else {
QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
} }
QW_ERR_RET(code); QW_ERR_JRET(code);
} }
} }
...@@ -105,7 +105,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -105,7 +105,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
dsEndPut(sinkHandle, useconds); dsEndPut(sinkHandle, useconds);
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryStop) { if (queryStop) {
*queryStop = true; *queryStop = true;
...@@ -114,7 +114,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -114,7 +114,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
break; break;
} }
for(int32_t j = 0; j < taosArrayGetSize(pResList); ++j) { for (int32_t j = 0; j < taosArrayGetSize(pResList); ++j) {
SSDataBlock *pRes = taosArrayGetP(pResList, j); SSDataBlock *pRes = taosArrayGetP(pResList, j);
ASSERT(pRes->info.rows > 0); ASSERT(pRes->info.rows > 0);
...@@ -122,7 +122,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -122,7 +122,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
if (code) { if (code) {
QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code); QW_ERR_JRET(code);
} }
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
...@@ -151,6 +151,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -151,6 +151,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
QW_RET(code); QW_RET(code);
_return:
taosArrayDestroy(pResList);
return code;
} }
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
...@@ -222,7 +227,8 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, ...@@ -222,7 +227,8 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows); QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks,
pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
if (NULL == rsp) { if (NULL == rsp) {
...@@ -266,7 +272,8 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, ...@@ -266,7 +272,8 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
pOutput->numOfBlocks++; pOutput->numOfBlocks++;
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows); QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks,
pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
break; break;
} }
...@@ -312,7 +319,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes ...@@ -312,7 +319,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
QW_ERR_RET(code); QW_ERR_RET(code);
} }
SDeleterRes* pDelRes = (SDeleterRes*)output.pData; SDeleterRes *pDelRes = (SDeleterRes *)output.pData;
pRes->suid = pDelRes->suid; pRes->suid = pDelRes->suid;
pRes->uidList = pDelRes->uidList; pRes->uidList = pDelRes->uidList;
...@@ -326,7 +333,6 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes ...@@ -326,7 +333,6 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
...@@ -355,8 +361,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -355,8 +361,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
//QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
break; break;
...@@ -391,8 +397,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -391,8 +397,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
//QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
...@@ -449,8 +455,8 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -449,8 +455,8 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
//QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
...@@ -507,7 +513,6 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { ...@@ -507,7 +513,6 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
...@@ -537,8 +542,7 @@ _return: ...@@ -537,8 +542,7 @@ _return:
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
SSubplan *plan = NULL; SSubplan *plan = NULL;
...@@ -556,7 +560,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql) { ...@@ -556,7 +560,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql) {
ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->msgType = qwMsg->msgType; ctx->msgType = qwMsg->msgType;
//QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qStringToSubplan(qwMsg->msg, &plan); code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
...@@ -759,8 +763,8 @@ _return: ...@@ -759,8 +763,8 @@ _return:
} }
if (!rsped) { if (!rsped) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
dataLen); qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} }
} }
...@@ -919,7 +923,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { ...@@ -919,7 +923,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
uint64_t *sId = taosHashGetKey(pIter, NULL); uint64_t *sId = taosHashGetKey(pIter, NULL);
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId); QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch->tasksHash) <= 0) { if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
taosHashGetSize(sch->tasksHash) <= 0) {
taosArrayPush(pExpiredSch, sId); taosArrayPush(pExpiredSch, sId);
} }
...@@ -998,7 +1003,6 @@ _return: ...@@ -998,7 +1003,6 @@ _return:
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) { if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
qError("invalid param to init qworker"); qError("invalid param to init qworker");
...@@ -1139,6 +1143,3 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt ...@@ -1139,6 +1143,3 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -642,7 +642,6 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar ...@@ -642,7 +642,6 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar
int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE; int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
trimFn(input, output, type, charLen); trimFn(input, output, type, charLen);
varDataSetLen(output, len);
colDataAppend(pOutputData, i, output, false); colDataAppend(pOutputData, i, output, false);
output += varDataTLen(output); output += varDataTLen(output);
} }
......
...@@ -293,7 +293,7 @@ int transSendResponse(const STransMsg* msg); ...@@ -293,7 +293,7 @@ int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg); int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst); int transSockInfo2Str(struct sockaddr* sockname, char* dst);
int64_t transAllocHandle(); int64_t transAllocHandle();
......
...@@ -103,14 +103,6 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); ...@@ -103,14 +103,6 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, SCliConn* conn); static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[16] = {0};
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r;
}
// register timer for read // register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle); static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
...@@ -127,6 +119,8 @@ static void cliAsyncCb(uv_async_t* handle); ...@@ -127,6 +119,8 @@ static void cliAsyncCb(uv_async_t* handle);
static void cliIdleCb(uv_idle_t* handle); static void cliIdleCb(uv_idle_t* handle);
static void cliPrepareCb(uv_prepare_t* handle); static void cliPrepareCb(uv_prepare_t* handle);
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
static int32_t allocConnRef(SCliConn* conn, bool update); static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
...@@ -361,6 +355,9 @@ void cliHandleResp(SCliConn* conn) { ...@@ -361,6 +355,9 @@ void cliHandleResp(SCliConn* conn) {
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL; STransConnCtx* pCtx = NULL;
if (cliRecvReleaseReq(conn, pHead)) {
return;
}
CONN_SHOULD_RELEASE(conn, pHead); CONN_SHOULD_RELEASE(conn, pHead);
if (CONN_NO_PERSIST_BY_APP(conn)) { if (CONN_NO_PERSIST_BY_APP(conn)) {
...@@ -383,7 +380,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -383,7 +380,7 @@ void cliHandleResp(SCliConn* conn) {
transMsg.info.ahandle); transMsg.info.ahandle);
} }
} else { } else {
pCtx = pMsg ? pMsg->ctx : NULL; pCtx = pMsg->ctx;
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle); tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
} }
...@@ -395,7 +392,6 @@ void cliHandleResp(SCliConn* conn) { ...@@ -395,7 +392,6 @@ void cliHandleResp(SCliConn* conn) {
} }
STraceId* trace = &transMsg.info.traceId; STraceId* trace = &transMsg.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code)); TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code));
...@@ -830,11 +826,11 @@ void cliConnCb(uv_connect_t* req, int status) { ...@@ -830,11 +826,11 @@ void cliConnCb(uv_connect_t* req, int status) {
int addrlen = sizeof(peername); int addrlen = sizeof(peername);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
transGetSockDebugInfo(&peername, pConn->dst); transSockInfo2Str(&peername, pConn->dst);
addrlen = sizeof(sockname); addrlen = sizeof(sockname);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
transGetSockDebugInfo(&sockname, pConn->src); transSockInfo2Str(&sockname, pConn->src);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
...@@ -1053,6 +1049,30 @@ static void cliPrepareCb(uv_prepare_t* handle) { ...@@ -1053,6 +1049,30 @@ static void cliPrepareCb(uv_prepare_t* handle) {
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
} }
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
uint64_t ahandle = pHead->ahandle;
SCliMsg* pMsg = NULL;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
transClearBuffer(&conn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) {
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0);
if (cliMsg->type == Release) return true;
}
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
if (T_REF_VAL_GET(conn) > 1) {
transUnrefCliHandle(conn);
}
destroyCmsg(pMsg);
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
return true;
}
return false;
}
static void* cliWorkThread(void* arg) { static void* cliWorkThread(void* arg) {
SCliThrd* pThrd = (SCliThrd*)arg; SCliThrd* pThrd = (SCliThrd*)arg;
pThrd->pid = taosGetSelfPthreadId(); pThrd->pid = taosGetSelfPthreadId();
......
...@@ -77,7 +77,7 @@ void transFreeMsg(void* msg) { ...@@ -77,7 +77,7 @@ void transFreeMsg(void* msg) {
} }
taosMemoryFree((char*)msg - sizeof(STransMsgHead)); taosMemoryFree((char*)msg - sizeof(STransMsgHead));
} }
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) { int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname; struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[20] = {0}; char buf[20] = {0};
......
...@@ -114,6 +114,8 @@ static void uvAcceptAsyncCb(uv_async_t* handle); ...@@ -114,6 +114,8 @@ static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvShutDownCb(uv_shutdown_t* req, int status);
static void uvPrepareCb(uv_prepare_t* handle); static void uvPrepareCb(uv_prepare_t* handle);
static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead);
/* /*
* time-consuming task throwed into BG work thread * time-consuming task throwed into BG work thread
*/ */
...@@ -123,7 +125,7 @@ static void uvWorkAfterTask(uv_work_t* req, int status); ...@@ -123,7 +125,7 @@ static void uvWorkAfterTask(uv_work_t* req, int status);
static void uvWalkCb(uv_handle_t* handle, void* arg); static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle); static void uvFreeCb(uv_handle_t* handle);
static void uvStartSendRespInternal(SSvrMsg* smsg); static void uvStartSendRespImpl(SSvrMsg* smsg);
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb); static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSvrMsg* msg); static void uvStartSendResp(SSvrMsg* msg);
...@@ -154,37 +156,6 @@ static void* transAcceptThread(void* arg); ...@@ -154,37 +156,6 @@ static void* transAcceptThread(void* arg);
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
static bool addHandleToAcceptloop(void* arg); static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
reallocConnRef(conn); \
tTrace("conn %p received release request", conn); \
\
STraceId traceId = head->traceId; \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
\
STransMsg tmsg = { \
.code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
#define SRV_RELEASE_UV(loop) \ #define SRV_RELEASE_UV(loop) \
do { \ do { \
uv_walk(loop, uvWalkCb, NULL); \ uv_walk(loop, uvWalkCb, NULL); \
...@@ -230,7 +201,9 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -230,7 +201,9 @@ static void uvHandleReq(SSvrConn* pConn) {
// transRefSrvHandle(pConn); // transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
CONN_SHOULD_RELEASE(pConn, pHead); if (uvRecvReleaseReq(pConn, pHead)) {
return;
}
STransMsg transMsg; STransMsg transMsg;
memset(&transMsg, 0, sizeof(transMsg)); memset(&transMsg, 0, sizeof(transMsg));
...@@ -356,10 +329,10 @@ void uvOnSendCb(uv_write_t* req, int status) { ...@@ -356,10 +329,10 @@ void uvOnSendCb(uv_write_t* req, int status) {
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg != NULL) { if (msg != NULL) {
uvStartSendRespInternal(msg); uvStartSendRespImpl(msg);
} }
} else { } else {
uvStartSendRespInternal(msg); uvStartSendRespImpl(msg);
} }
} }
} }
...@@ -423,7 +396,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -423,7 +396,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
wb->len = len; wb->len = len;
} }
static void uvStartSendRespInternal(SSvrMsg* smsg) { static void uvStartSendRespImpl(SSvrMsg* smsg) {
SSvrConn* pConn = smsg->pConn; SSvrConn* pConn = smsg->pConn;
if (pConn->broken) { if (pConn->broken) {
return; return;
...@@ -453,7 +426,7 @@ static void uvStartSendResp(SSvrMsg* smsg) { ...@@ -453,7 +426,7 @@ static void uvStartSendResp(SSvrMsg* smsg) {
if (!transQueuePush(&pConn->srvMsgs, smsg)) { if (!transQueuePush(&pConn->srvMsgs, smsg)) {
return; return;
} }
uvStartSendRespInternal(smsg); uvStartSendRespImpl(smsg);
return; return;
} }
...@@ -544,6 +517,35 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { ...@@ -544,6 +517,35 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
uv_close((uv_handle_t*)req->handle, uvDestroyConn); uv_close((uv_handle_t*)req->handle, uvDestroyConn);
taosMemoryFree(req); taosMemoryFree(req);
} }
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
reallocConnRef(pConn);
tTrace("conn %p received release request", pConn);
STraceId traceId = pHead->traceId;
pConn->status = ConnRelease;
transClearBuffer(&pConn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527};
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
srvMsg->msg = tmsg;
srvMsg->type = Release;
srvMsg->pConn = pConn;
if (!transQueuePush(&pConn->srvMsgs, srvMsg)) {
return true;
}
if (pConn->regArg.init) {
tTrace("conn %p release, notify server app", pConn);
STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL);
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
}
uvStartSendRespImpl(srvMsg);
return true;
}
return false;
}
static void uvPrepareCb(uv_prepare_t* handle) { static void uvPrepareCb(uv_prepare_t* handle) {
// prepare callback // prepare callback
SWorkThrd* pThrd = handle->data; SWorkThrd* pThrd = handle->data;
...@@ -696,7 +698,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -696,7 +698,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
return; return;
} }
transGetSockDebugInfo(&peername, pConn->dst); transSockInfo2Str(&peername, pConn->dst);
addrlen = sizeof(sockname); addrlen = sizeof(sockname);
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&sockname, &addrlen)) { if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&sockname, &addrlen)) {
...@@ -704,7 +706,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -704,7 +706,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
return; return;
} }
transGetSockDebugInfo(&sockname, pConn->src); transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname; struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
pConn->clientIp = addr.sin_addr.s_addr; pConn->clientIp = addr.sin_addr.s_addr;
...@@ -992,7 +994,7 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { ...@@ -992,7 +994,7 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
if (!transQueuePush(&conn->srvMsgs, msg)) { if (!transQueuePush(&conn->srvMsgs, msg)) {
return; return;
} }
uvStartSendRespInternal(msg); uvStartSendRespImpl(msg);
return; return;
} else if (conn->status == ConnRelease || conn->status == ConnNormal) { } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn); tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn);
......
...@@ -238,19 +238,23 @@ class TAdapter: ...@@ -238,19 +238,23 @@ class TAdapter:
if self.running != 0: if self.running != 0:
psCmd = f"ps -ef|grep -w {toBeKilled}| grep -v grep | awk '{{print $2}}'" psCmd = f"ps -ef|grep -w {toBeKilled}| grep -v grep | awk '{{print $2}}'"
# psCmd = f"pgrep {toBeKilled}"
processID = subprocess.check_output( processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8") psCmd, shell=True)
while(processID): while(processID):
killCmd = f"kill {signal} {processID} > /dev/null 2>&1" killCmd = f"pkill {signal} {processID} > /dev/null 2>&1"
os.system(killCmd) os.system(killCmd)
time.sleep(1) time.sleep(1)
processID = subprocess.check_output( processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8") psCmd, shell=True).decode("utf-8")
if not platform.system().lower() == 'windows': if not platform.system().lower() == 'windows':
for port in range(6030, 6041): port = 6041
fuserCmd = f"fuser -k -n tcp {port} > /dev/null" fuserCmd = f"fuser -k -n tcp {port} > /dev/null"
os.system(fuserCmd) os.system(fuserCmd)
# for port in range(6030, 6041):
# fuserCmd = f"fuser -k -n tcp {port} > /dev/null"
# os.system(fuserCmd)
self.running = 0 self.running = 0
tdLog.debug(f"taosadapter is stopped by kill {signal}") tdLog.debug(f"taosadapter is stopped by kill {signal}")
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册