未验证 提交 716f47b6 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22712 from taosdata/fix/main-merge

Fix/main merge
---
sidebar_label: Seeq
title: Seeq
description: How to use Seeq and TDengine to perform time series data analysis
---
# How to use Seeq and TDengine to perform time series data analysis
## Introduction
Seeq is an advanced analytics software for the manufacturing industry and the Industrial Internet of Things (IIoT). Seeq supports the use of machine learning innovations within process manufacturing organizations. These capabilities enable organizations to deploy their own or third-party machine learning algorithms into advanced analytics applications used by frontline process engineers and subject matter experts, thus extending the efforts of a single data scientist to many frontline workers.
With the TDengine Java connector, Seeq effortlessly supports querying time series data provided by TDengine and offers functionalities such as data visualization, analysis, and forecasting.
### Install Seeq
Please download Seeq Server and Seeq Data Lab software installation package from the [Seeq official website](https://www.seeq.com/customer-download).
### Install and start Seeq Server
```
tar xvzf seeq-server-xxx.tar.gz
cd seeq-server-installer
sudo ./install
sudo seeq service enable
sudo seeq start
```
### Install and start Seeq Data Lab Server
Seeq Data Lab needs to be installed on a separate server from Seeq Server and connected to Seeq Server through configuration. For detailed installation and configuration instructions, please refer to [the official documentation](https://support.seeq.com/space/KB/1034059842).
```
tar xvf seeq-data-lab-<version>-64bit-linux.tar.gz
sudo seeq-data-lab-installer/install -f /opt/seeq/seeq-data-lab -g /var/opt/seeq -u seeq
sudo seeq config set Network/DataLab/Hostname localhost
sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231)
sudo seeq config set Network/Hostname <value> # the host IP or URL of the main Seeq Server
# If the main Seeq server is configured to listen over HTTPS
sudo seeq config set Network/Webserver/SecurePort 443 # the secure port of the main Seeq Server (usually 443)
# If the main Seeq server is NOT configured to listen over HTTPS
sudo seeq config set Network/Webserver/Port <value>
#On the main Seeq server, open a Seeq Command Prompt and set the hostname of the Data Lab server:
sudo seeq config set Network/DataLab/Hostname <value> # the host IP (not URL) of the Data Lab server
sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231
```
### Install TDengine on-premise instance
See [Quick Install from Package](../../get-started).
### Or use TDengine Cloud
Register for a [TDengine Cloud](https://cloud.tdengine.com) account and log in to your account.
## Make Seeq be able to access TDengine
1. Get data location configuration
```
sudo seeq config get Folders/Data
```
2. Download TDengine Java connector from maven.org. Please use the latest version (Current is 3.2.5, https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.2.5/taos-jdbcdriver-3.2.5-dist.jar).
3. Restart Seeq server
```
sudo seeq restart
```
4. Input License
Use a browser to access ip:34216 and input the license according to the guide.
## How to use Seeq to analyze time-series data that TDengine serves
This chapter demonstrates how to use Seeq software in conjunction with TDengine for time series data analysis.
### Scenario Overview
The example scenario involves a power system where users collect electricity consumption data from metering devices at a power station on a daily basis. This data is stored in a TDengine cluster. The user now wants to predict how the electricity consumption will develop and purchase additional equipment to support it. The electricity consumption varies with monthly orders, and seasonal variations also affect the power consumption. Since the city is located in the Northern Hemisphere, more electricity is consumed during the summer. We will use simulated data to reflect these assumptions.
### Schema
```
CREATE STABLE meters (ts TIMESTAMP, num INT, temperature FLOAT, goods INT) TAGS (device NCHAR(20));
CREATE TABLE goods (ts1 TIMESTAMP, ts2 TIMESTAMP, goods FLOAT);
```
![Seeq demo schema](./seeq/seeq-demo-schema.webp)
### Mock data
```
python mockdata.py
taos -s "insert into power.goods select _wstart, _wstart + 10d, avg(goods) from power.meters interval(10d);"
```
The source code is hosted at [GitHub Repository](https://github.com/sangshuduo/td-forecasting).
### Using Seeq for data analysis
#### Data Source configuration
Please login with Seeq administrator and create a few data sources as following.
- Power
```
{
"QueryDefinitions": [
{
"Name": "PowerNum",
"Type": "SIGNAL",
"Sql": "SELECT ts, num FROM meters",
"Enabled": true,
"TestMode": false,
"TestQueriesDuringSync": true,
"InProgressCapsulesEnabled": false,
"Variables": null,
"Properties": [
{
"Name": "Name",
"Value": "Num",
"Sql": null,
"Uom": "string"
},
{
"Name": "Interpolation Method",
"Value": "linear",
"Sql": null,
"Uom": "string"
},
{
"Name": "Maximum Interpolation",
"Value": "2day",
"Sql": null,
"Uom": "string"
}
],
"CapsuleProperties": null
}
],
"Type": "GENERIC",
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
"UseWindowsAuth": false,
"SqlFetchBatchSize": 100000,
"UseSSL": false,
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
}
```
- Goods
```
{
"QueryDefinitions": [
{
"Name": "PowerGoods",
"Type": "CONDITION",
"Sql": "SELECT ts1, ts2, goods FROM power.goods",
"Enabled": true,
"TestMode": false,
"TestQueriesDuringSync": true,
"InProgressCapsulesEnabled": false,
"Variables": null,
"Properties": [
{
"Name": "Name",
"Value": "Goods",
"Sql": null,
"Uom": "string"
},
{
"Name": "Maximum Duration",
"Value": "10days",
"Sql": null,
"Uom": "string"
}
],
"CapsuleProperties": [
{
"Name": "goods",
"Value": "${columnResult}",
"Column": "goods",
"Uom": "string"
}
]
}
],
"Type": "GENERIC",
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
"UseWindowsAuth": false,
"SqlFetchBatchSize": 100000,
"UseSSL": false,
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
}
```
- Temperature
```
{
"QueryDefinitions": [
{
"Name": "PowerNum",
"Type": "SIGNAL",
"Sql": "SELECT ts, temperature FROM meters",
"Enabled": true,
"TestMode": false,
"TestQueriesDuringSync": true,
"InProgressCapsulesEnabled": false,
"Variables": null,
"Properties": [
{
"Name": "Name",
"Value": "Temperature",
"Sql": null,
"Uom": "string"
},
{
"Name": "Interpolation Method",
"Value": "linear",
"Sql": null,
"Uom": "string"
},
{
"Name": "Maximum Interpolation",
"Value": "2day",
"Sql": null,
"Uom": "string"
}
],
"CapsuleProperties": null
}
],
"Type": "GENERIC",
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
"UseWindowsAuth": false,
"SqlFetchBatchSize": 100000,
"UseSSL": false,
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
}
```
#### Launch Seeq Workbench
Please login to Seeq server with IP:port and create a new Seeq Workbench, then select data sources and choose the correct tools to do data visualization and analysis. Please refer to [the official documentation](https://support.seeq.com/space/KB/146440193/Seeq+Workbench) for the details.
![Seeq Workbench](./seeq/seeq-demo-workbench.webp)
#### Use Seeq Data Lab Server for advanced data analysis
Please login to the Seeq service with IP:port and create a new Seeq Data Lab. Then you can use advanced tools including Python environment and machine learning add-ons for more complex analysis.
```Python
from seeq import spy
spy.options.compatibility = 189
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import mlforecast
import lightgbm as lgb
from mlforecast.target_transforms import Differences
from sklearn.linear_model import LinearRegression
ds = spy.search({'ID': "8C91A9C7-B6C2-4E18-AAAF-XXXXXXXXX"})
print(ds)
sig = ds.loc[ds['Name'].isin(['Num'])]
print(sig)
data = spy.pull(sig, start='2015-01-01', end='2022-12-31', grid=None)
print("data.info()")
data.info()
print(data)
#data.plot()
print("data[Num].info()")
data['Num'].info()
da = data['Num'].index.tolist()
#print(da)
li = data['Num'].tolist()
#print(li)
data2 = pd.DataFrame()
data2['ds'] = da
print('1st data2 ds info()')
data2['ds'].info()
#data2['ds'] = pd.to_datetime(data2['ds']).to_timestamp()
data2['ds'] = pd.to_datetime(data2['ds']).astype('int64')
data2['y'] = li
print('2nd data2 ds info()')
data2['ds'].info()
print(data2)
data2.insert(0, column = "unique_id", value="unique_id")
print("Forecasting ...")
forecast = mlforecast.MLForecast(
models = lgb.LGBMRegressor(),
freq = 1,
lags=[365],
target_transforms=[Differences([365])],
)
forecast.fit(data2)
predicts = forecast.predict(365)
pd.concat([data2, predicts]).set_index("ds").plot(title = "current data with forecast")
plt.show()
```
Example output:
![Seeq forecast result](./seeq/seeq-forecast-result.webp)
### How to configure Seeq data source to access TDengine Cloud
Configuring a Seeq data source connection to TDengine Cloud or a local installation instance does not have any essential differences. After logging in to TDengine Cloud, select "Programming - Java" and copy the JDBC URL string with the token provided. Then, use this JDBC URL string to fill in the DatabaseJdbcUrl value in the Seeq Data Source configuration.
Please note that when using TDengine Cloud, you need to specify the database name in your SQL commands.
#### The data source of TDengine Cloud example
```
{
"QueryDefinitions": [
{
"Name": "CloudVoltage",
"Type": "SIGNAL",
"Sql": "SELECT ts, voltage FROM test.meters",
"Enabled": true,
"TestMode": false,
"TestQueriesDuringSync": true,
"InProgressCapsulesEnabled": false,
"Variables": null,
"Properties": [
{
"Name": "Name",
"Value": "Voltage",
"Sql": null,
"Uom": "string"
},
{
"Name": "Interpolation Method",
"Value": "linear",
"Sql": null,
"Uom": "string"
},
{
"Name": "Maximum Interpolation",
"Value": "2day",
"Sql": null,
"Uom": "string"
}
],
"CapsuleProperties": null
}
],
"Type": "GENERIC",
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
"UseWindowsAuth": false,
"SqlFetchBatchSize": 100000,
"UseSSL": false,
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://gw.cloud.taosdata.com?useSSL=true&token=41ac9d61d641b6b334e8b76f45f5a8XXXXXXXXXX",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
}
```
#### Seeq Workbench with TDengine Cloud data source example
![Seeq workbench with TDengine Cloud](./seeq/seeq-workbench-with-tdengine-cloud.webp)
## Conclusion
By integrating Seeq and TDengine, it is possible to leverage the efficient storage and querying performance of TDengine while also benefiting from Seeq's powerful data visualization and analysis capabilities provided to users.
This integration allows users to take advantage of TDengine's high-performance time-series data storage and retrieval, ensuring efficient handling of large volumes of data. At the same time, Seeq provides advanced analytics features such as data visualization, anomaly detection, correlation analysis, and predictive modeling, enabling users to gain valuable insights and make data-driven decisions.
Together, Seeq and TDengine provide a comprehensive solution for time series data analysis in diverse industries such as manufacturing, IIoT, and power systems. The combination of efficient data storage and advanced analytics empowers users to unlock the full potential of their time series data, driving operational improvements, and enabling predictive and prescriptive analytics applications.
...@@ -11,7 +11,11 @@ taosBenchmark (曾用名 taosdemo ) 是一个用于测试 TDengine 产品性能 ...@@ -11,7 +11,11 @@ taosBenchmark (曾用名 taosdemo ) 是一个用于测试 TDengine 产品性能
## 安装 ## 安装
- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark taosBenchmark 有两种安装方式:
- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考[ TDengine 安装](../../operation/pkg-install)
- 单独编译 taos-tools 并安装, 详情请参考 [taos-tools](https://github.com/taosdata/taos-tools) 仓库。
## 运行 ## 运行
......
...@@ -13,7 +13,12 @@ taosKeeper 是 TDengine 3.0 版本监控指标的导出工具,通过简单的 ...@@ -13,7 +13,12 @@ taosKeeper 是 TDengine 3.0 版本监控指标的导出工具,通过简单的
## 安装 ## 安装
- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper taosKeeper 有两种安装方式:
taosKeeper 安装方式:
- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../operation/pkg-install)
- 单独编译 taosKeeper 并安装,详情请参考 [taosKeeper](https://github.com/taosdata/taoskeeper) 仓库。
## 配置和运行方式 ## 配置和运行方式
......
...@@ -23,7 +23,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送 ...@@ -23,7 +23,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送
1. Linux 操作系统 1. Linux 操作系统
2. 已安装 Java 8 和 Maven 2. 已安装 Java 8 和 Maven
3. 已安装 Git、curl、vi 3. 已安装 Git、curl、vi
4. 已安装并启动 TDengine。 4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../operation/pkg-install)
## 安装 Kafka ## 安装 Kafka
......
...@@ -14,7 +14,7 @@ Seeq 是制造业和工业互联网(IIOT)高级分析软件。Seeq 支持在 ...@@ -14,7 +14,7 @@ Seeq 是制造业和工业互联网(IIOT)高级分析软件。Seeq 支持在
### Seeq 安装方法 ### Seeq 安装方法
(Seeq 官网)[https://www.seeq.com/customer-download]下载相关软件,例如 Seeq Server 和 Seeq Data Lab 等。 [Seeq 官网](https://www.seeq.com/customer-download)下载相关软件,例如 Seeq Server 和 Seeq Data Lab 等。
### Seeq Server 安装和启动 ### Seeq Server 安装和启动
...@@ -29,7 +29,7 @@ sudo seeq start ...@@ -29,7 +29,7 @@ sudo seeq start
### Seeq Data Lab Server 安装和启动 ### Seeq Data Lab Server 安装和启动
Seeq Data Lab 需要安装在和 Seeq Server 不同的服务器上,并通过配置和 Seeq Server 互联。详细安装配置指令参见(Seeq 官方文档)[https://support.seeq.com/space/KB/1034059842] Seeq Data Lab 需要安装在和 Seeq Server 不同的服务器上,并通过配置和 Seeq Server 互联。详细安装配置指令参见[Seeq 官方文档](https://support.seeq.com/space/KB/1034059842)
``` ```
tar xvf seeq-data-lab-<version>-64bit-linux.tar.gz tar xvf seeq-data-lab-<version>-64bit-linux.tar.gz
...@@ -51,7 +51,7 @@ sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab serve ...@@ -51,7 +51,7 @@ sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab serve
## TDengine 本地实例安装方法 ## TDengine 本地实例安装方法
请参考(官网文档)[https://docs.taosdata.com/get-started/package/] 请参考[官网文档](../../get-started)
## TDengine Cloud 访问方法 ## TDengine Cloud 访问方法
如果使用 Seeq 连接 TDengine Cloud,请在 https://cloud.taosdata.com 申请帐号并登录查看如何访问 TDengine Cloud。 如果使用 Seeq 连接 TDengine Cloud,请在 https://cloud.taosdata.com 申请帐号并登录查看如何访问 TDengine Cloud。
...@@ -64,7 +64,7 @@ sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab serve ...@@ -64,7 +64,7 @@ sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab serve
sudo seeq config get Folders/Data sudo seeq config get Folders/Data
``` ```
2. 从 maven.org 下载 TDengine Java connector 包,目前最新版本为(3.2.4)[https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.2.4/taos-jdbcdriver-3.2.4-dist.jar],并拷贝至 data 存储位置的 plugins\lib 中。 2. 从 maven.org 下载 TDengine Java connector 包,目前最新版本为[3.2.5](https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.2.5/taos-jdbcdriver-3.2.5-dist.jar),并拷贝至 data 存储位置的 plugins\lib 中。
3. 重新启动 seeq server 3. 重新启动 seeq server
...@@ -91,7 +91,7 @@ CREATE STABLE meters (ts TIMESTAMP, num INT, temperature FLOAT, goods INT) TAGS ...@@ -91,7 +91,7 @@ CREATE STABLE meters (ts TIMESTAMP, num INT, temperature FLOAT, goods INT) TAGS
CREATE TABLE goods (ts1 TIMESTAMP, ts2 TIMESTAMP, goods FLOAT); CREATE TABLE goods (ts1 TIMESTAMP, ts2 TIMESTAMP, goods FLOAT);
``` ```
!(Seeq demo schema)[./seeq/seeq-demo-schema.webp] ![Seeq demo schema](./seeq/seeq-demo-schema.webp)
### 构造数据方法 ### 构造数据方法
...@@ -99,7 +99,8 @@ CREATE TABLE goods (ts1 TIMESTAMP, ts2 TIMESTAMP, goods FLOAT); ...@@ -99,7 +99,8 @@ CREATE TABLE goods (ts1 TIMESTAMP, ts2 TIMESTAMP, goods FLOAT);
python mockdata.py python mockdata.py
taos -s "insert into power.goods select _wstart, _wstart + 10d, avg(goods) from power.meters interval(10d);" taos -s "insert into power.goods select _wstart, _wstart + 10d, avg(goods) from power.meters interval(10d);"
``` ```
源代码托管在(github 仓库)[https://github.com/sangshuduo/td-forecasting]。
源代码托管在[GitHub 仓库](https://github.com/sangshuduo/td-forecasting)
### 使用 Seeq 进行数据分析 ### 使用 Seeq 进行数据分析
...@@ -287,9 +288,9 @@ taos -s "insert into power.goods select _wstart, _wstart + 10d, avg(goods) from ...@@ -287,9 +288,9 @@ taos -s "insert into power.goods select _wstart, _wstart + 10d, avg(goods) from
#### 使用 Seeq Workbench #### 使用 Seeq Workbench
登录 Seeq 服务页面并新建 Seeq Workbench,通过选择数据源搜索结果和根据需要选择不同的工具,可以进行数据展现或预测,详细使用方法参见(官方知识库)[https://support.seeq.com/space/KB/146440193/Seeq+Workbench] 登录 Seeq 服务页面并新建 Seeq Workbench,通过选择数据源搜索结果和根据需要选择不同的工具,可以进行数据展现或预测,详细使用方法参见[官方知识库](https://support.seeq.com/space/KB/146440193/Seeq+Workbench)
!(Seeq Workbench)[./seeq/seeq-demo-workbench.webp] ![Seeq Workbench](./seeq/seeq-demo-workbench.webp)
#### 用 Seeq Data Lab Server 进行进一步的数据分析 #### 用 Seeq Data Lab Server 进行进一步的数据分析
...@@ -358,7 +359,7 @@ plt.show() ...@@ -358,7 +359,7 @@ plt.show()
运行程序输出结果: 运行程序输出结果:
!(Seeq forecast result)[./seeq/seeq-forecast-result.webp] ![Seeq forecast result](./seeq/seeq-forecast-result.webp)
### 配置 Seeq 数据源连接 TDengine Cloud ### 配置 Seeq 数据源连接 TDengine Cloud
...@@ -426,7 +427,7 @@ plt.show() ...@@ -426,7 +427,7 @@ plt.show()
#### TDengine Cloud 作为数据源的 Seeq Workbench 界面示例 #### TDengine Cloud 作为数据源的 Seeq Workbench 界面示例
!(Seeq workbench with TDengine cloud)[./seeq/seeq-workbench-with-tdengine-cloud.webp] ![Seeq workbench with TDengine cloud](./seeq/seeq-workbench-with-tdengine-cloud.webp)
## 方案总结 ## 方案总结
......
...@@ -1255,12 +1255,12 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { ...@@ -1255,12 +1255,12 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
sprintf(detail, "colVer:%d, delay1:%" PRId64 ", delay2:%" PRId64 ", deleteMark1:%" PRId64 ", " sprintf(detail, "colVer:%d, delay1:%" PRId64 ", delay2:%" PRId64 ", deleteMark1:%" PRId64 ", "
"deleteMark2:%" PRId64 ", igExists:%d, numOfColumns:%d, numOfFuncs:%d, numOfTags:%d, " "deleteMark2:%" PRId64 ", igExists:%d, numOfColumns:%d, numOfFuncs:%d, numOfTags:%d, "
"source:%d, suid:%" PRId64 ", tagVer:%d, ttl:%d, " "source:%d, suid:%" PRId64 ", tagVer:%d, ttl:%d, "
"watermark1:%" PRId64 ", watermark2:%" PRId64, "watermark1:%" PRId64 ", watermark2:%" PRId64,
createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1, createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1,
createReq.deleteMark2, createReq.igExists, createReq.numOfColumns, createReq.numOfFuncs, createReq.numOfTags, createReq.deleteMark2, createReq.igExists, createReq.numOfColumns, createReq.numOfFuncs, createReq.numOfTags,
createReq.source, createReq.suid, createReq.tagVer, createReq.ttl, createReq.source, createReq.suid, createReq.tagVer, createReq.ttl,
createReq.watermark1, createReq.watermark2); createReq.watermark1, createReq.watermark2);
mndAuditFieldStr(detail, createReq.pColumns, createReq.numOfColumns, AUDIT_DETAIL_MAX); mndAuditFieldStr(detail, createReq.pColumns, createReq.numOfColumns, AUDIT_DETAIL_MAX);
mndAuditFieldStr(detail, createReq.pTags, createReq.numOfTags, AUDIT_DETAIL_MAX); mndAuditFieldStr(detail, createReq.pTags, createReq.numOfTags, AUDIT_DETAIL_MAX);
...@@ -2610,7 +2610,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { ...@@ -2610,7 +2610,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
char detail[2000] = {0}; char detail[2000] = {0};
sprintf(detail, "igNotExists:%d, source:%d" , sprintf(detail, "igNotExists:%d, source:%d" ,
dropReq.igNotExists, dropReq.source); dropReq.igNotExists, dropReq.source);
SName name = {0}; SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
......
...@@ -876,13 +876,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -876,13 +876,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
char detail[2000] = {0}; char detail[2000] = {0};
sprintf(detail, sprintf(detail,
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
", " ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64
"fillHistory:%d, igExists:%d, " ", maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
"igExpired:%d, igUpdate:%d, lastTs:%" PRId64
", "
"maxDelay:%" PRId64
", numOfTags:%d, sourceDB:%s, "
"targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
...@@ -2315,9 +2310,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { ...@@ -2315,9 +2310,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) { for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId}; int64_t k[2] = {p->streamId, p->taskId};
int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
if (index == NULL) {
continue;
}
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
pStatusEntry->status = p->status; pStatusEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) { if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status)); mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status));
......
...@@ -642,7 +642,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { ...@@ -642,7 +642,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
SName tableName = {0}; SName tableName = {0};
tNameFromString(&tableName, createTopicReq.subStbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&tableName, createTopicReq.subStbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(detail, "igExists:%d, subStbName:%s, subType:%d, withMeta:%d, sql:%s", sprintf(detail, "igExists:%d, subStbName:%s, subType:%d, withMeta:%d, sql:%s",
createTopicReq.igExists, tableName.tname, createTopicReq.subType, createTopicReq.withMeta, sql); createTopicReq.igExists, tableName.tname, createTopicReq.subType, createTopicReq.withMeta, sql);
SName dbname = {0}; SName dbname = {0};
......
...@@ -657,7 +657,7 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { ...@@ -657,7 +657,7 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[1000] = {0}; char detail[1000] = {0};
sprintf(detail, "createType:%d, enable:%d, superUser:%d, sysInfo:%d", sprintf(detail, "createType:%d, enable:%d, superUser:%d, sysInfo:%d",
createReq.createType, createReq.enable, createReq.superUser, createReq.sysInfo); createReq.createType, createReq.enable, createReq.superUser, createReq.sysInfo);
auditRecord(pReq, pMnode->clusterId, "createUser", createReq.user, "", detail); auditRecord(pReq, pMnode->clusterId, "createUser", createReq.user, "", detail);
...@@ -1039,16 +1039,16 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { ...@@ -1039,16 +1039,16 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[1000] = {0}; char detail[1000] = {0};
sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:", sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:",
mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, alterReq.tabName); mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, alterReq.tabName);
if(alterReq.alterType == TSDB_ALTER_USER_PASSWD){ if(alterReq.alterType == TSDB_ALTER_USER_PASSWD){
sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:xxx", sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:xxx",
mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo,
alterReq.tabName); alterReq.tabName);
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail); auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail);
} }
else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER || else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER ||
alterReq.alterType == TSDB_ALTER_USER_ENABLE || alterReq.alterType == TSDB_ALTER_USER_ENABLE ||
alterReq.alterType == TSDB_ALTER_USER_SYSINFO){ alterReq.alterType == TSDB_ALTER_USER_SYSINFO){
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail); auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail);
......
...@@ -425,6 +425,20 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * ...@@ -425,6 +425,20 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (code) goto _exit; if (code) goto _exit;
} }
if (pWriter->pStreamTaskWriter) {
code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pStreamStateWriter) {
code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
if (code) goto _exit;
code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0);
pWriter->pStreamStateWriter = NULL;
if (code) goto _exit;
}
if (pWriter->pRsmaSnapWriter) { if (pWriter->pRsmaSnapWriter) {
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback); code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
if (code) goto _exit; if (code) goto _exit;
......
...@@ -3734,7 +3734,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { ...@@ -3734,7 +3734,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
static int32_t removeConstantValueFromList(SNodeList** pList) { static int32_t removeConstantValueFromList(SNodeList** pList) {
SNode* pNode = NULL; SNode* pNode = NULL;
WHERE_EACH(pNode, *pList) { WHERE_EACH(pNode, *pList) {
if (nodeType(pNode) == QUERY_NODE_VALUE || if (nodeType(pNode) == QUERY_NODE_VALUE ||
(nodeType(pNode) == QUERY_NODE_FUNCTION && fmIsConstantResFunc((SFunctionNode*)pNode) && fmIsScalarFunc(((SFunctionNode*)pNode)->funcId))) { (nodeType(pNode) == QUERY_NODE_FUNCTION && fmIsConstantResFunc((SFunctionNode*)pNode) && fmIsScalarFunc(((SFunctionNode*)pNode)->funcId))) {
ERASE_NODE(*pList); ERASE_NODE(*pList);
continue; continue;
...@@ -3753,7 +3753,11 @@ static int32_t removeConstantValueFromList(SNodeList** pList) { ...@@ -3753,7 +3753,11 @@ static int32_t removeConstantValueFromList(SNodeList** pList) {
static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->currClause = SQL_CLAUSE_PARTITION_BY; pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pSelect->pPartitionByList) {
code = removeConstantValueFromList(&pSelect->pPartitionByList);
}
if (TSDB_CODE_SUCCESS == code && pSelect->pPartitionByList) { if (TSDB_CODE_SUCCESS == code && pSelect->pPartitionByList) {
int8_t typeType = getTableTypeFromTableNode(pSelect->pFromTable); int8_t typeType = getTableTypeFromTableNode(pSelect->pFromTable);
SNode* pPar = nodesListGetNode(pSelect->pPartitionByList, 0); SNode* pPar = nodesListGetNode(pSelect->pPartitionByList, 0);
......
...@@ -373,6 +373,11 @@ static bool tagScanNodeHasTbname(SNode* pKeys) { ...@@ -373,6 +373,11 @@ static bool tagScanNodeHasTbname(SNode* pKeys) {
static int32_t tagScanSetExecutionMode(SScanLogicNode* pScan) { static int32_t tagScanSetExecutionMode(SScanLogicNode* pScan) {
pScan->onlyMetaCtbIdx = false; pScan->onlyMetaCtbIdx = false;
if (pScan->tableType == TSDB_CHILD_TABLE) {
pScan->onlyMetaCtbIdx = false;
return TSDB_CODE_SUCCESS;
}
if (tagScanNodeListHasTbname(pScan->pScanPseudoCols)) { if (tagScanNodeListHasTbname(pScan->pScanPseudoCols)) {
pScan->onlyMetaCtbIdx = false; pScan->onlyMetaCtbIdx = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -442,7 +447,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -442,7 +447,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan->pScanPseudoCols = pNewScanPseudoCols; pScan->pScanPseudoCols = pNewScanPseudoCols;
} }
} }
*/ */
} }
if (NULL != pScan->pScanCols) { if (NULL != pScan->pScanCols) {
...@@ -511,7 +516,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -511,7 +516,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin; pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
// set left and right node // set left and right node
...@@ -559,7 +564,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -559,7 +564,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
code = createColumnByRewriteExprs(pColList, &pJoin->node.pTargets); code = createColumnByRewriteExprs(pColList, &pJoin->node.pTargets);
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SNodeList* pColList = NULL; SNodeList* pColList = NULL;
if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pRight) && !pJoin->isLowLevelJoin) { if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pRight) && !pJoin->isLowLevelJoin) {
......
...@@ -539,7 +539,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla ...@@ -539,7 +539,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx; pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
...@@ -726,7 +726,7 @@ static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) { ...@@ -726,7 +726,7 @@ static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) {
*ppSrc = NULL; *ppSrc = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (NULL == pLogicCond) { if (NULL == pLogicCond) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -754,7 +754,7 @@ static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SData ...@@ -754,7 +754,7 @@ static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SData
planError("Invalid join children num:%d or child type:%d", pChildren->length, nodeType(nodesListGetNode(pChildren, 0))); planError("Invalid join children num:%d or child type:%d", pChildren->length, nodeType(nodesListGetNode(pChildren, 0)));
return TSDB_CODE_PLAN_INTERNAL_ERROR; return TSDB_CODE_PLAN_INTERNAL_ERROR;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -775,12 +775,12 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi ...@@ -775,12 +775,12 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = getJoinDataBlockDescNode(pChildren, 1, &pRightDesc); code = getJoinDataBlockDescNode(pChildren, 1, &pRightDesc);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
&pJoin->pPrimKeyCond); &pJoin->pPrimKeyCond);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
&pJoin->pTargets); &pJoin->pTargets);
...@@ -869,7 +869,7 @@ static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1 ...@@ -869,7 +869,7 @@ static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1
if (NULL == pJoin->pOnLeft || NULL == pJoin->pOnRight) { if (NULL == pJoin->pOnLeft || NULL == pJoin->pOnRight) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
code = extractHashJoinOnCols(lBlkId, rBlkId, pEq1, pJoin); code = extractHashJoinOnCols(lBlkId, rBlkId, pEq1, pJoin);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = extractHashJoinOnCols(lBlkId, rBlkId, pEq2, pJoin); code = extractHashJoinOnCols(lBlkId, rBlkId, pEq2, pJoin);
...@@ -893,10 +893,10 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys ...@@ -893,10 +893,10 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
SNodeList* pNew = nodesMakeList(); SNodeList* pNew = nodesMakeList();
FOREACH(pNode, pJoin->pTargets) { FOREACH(pNode, pJoin->pTargets) {
SColumnNode* pCol = (SColumnNode*)pNode; SColumnNode* pCol = (SColumnNode*)pNode;
int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN);
tSimpleHashPut(pHash, name, len, &pCol, POINTER_BYTES); tSimpleHashPut(pHash, name, len, &pCol, POINTER_BYTES);
} }
...@@ -905,7 +905,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys ...@@ -905,7 +905,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys
FOREACH(pNode, pJoin->pOnLeft) { FOREACH(pNode, pJoin->pOnLeft) {
SColumnNode* pCol = (SColumnNode*)pNode; SColumnNode* pCol = (SColumnNode*)pNode;
int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN);
SNode** p = tSimpleHashGet(pHash, name, len); SNode** p = tSimpleHashGet(pHash, name, len);
if (p) { if (p) {
nodesListStrictAppend(pJoin->pTargets, *p); nodesListStrictAppend(pJoin->pTargets, *p);
...@@ -914,7 +914,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys ...@@ -914,7 +914,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys
} }
FOREACH(pNode, pJoin->pOnRight) { FOREACH(pNode, pJoin->pOnRight) {
SColumnNode* pCol = (SColumnNode*)pNode; SColumnNode* pCol = (SColumnNode*)pNode;
int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN);
SNode** p = tSimpleHashGet(pHash, name, len); SNode** p = tSimpleHashGet(pHash, name, len);
if (p) { if (p) {
nodesListStrictAppend(pJoin->pTargets, *p); nodesListStrictAppend(pJoin->pTargets, *p);
...@@ -930,7 +930,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys ...@@ -930,7 +930,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys
if (p == NULL) { if (p == NULL) {
break; break;
} }
nodesListStrictAppend(pJoin->pTargets, *p); nodesListStrictAppend(pJoin->pTargets, *p);
} }
} }
...@@ -958,10 +958,10 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil ...@@ -958,10 +958,10 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond); code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond); code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pJoin->pTagEqCond); code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pJoin->pTagEqCond);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) {
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOtherOnCond, &pJoin->pFilterConditions); code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOtherOnCond, &pJoin->pFilterConditions);
} }
...@@ -973,10 +973,10 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil ...@@ -973,10 +973,10 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond, pJoin->pColEqCond, pJoin->pTagEqCond, pJoin); code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond, pJoin->pColEqCond, pJoin->pTagEqCond, pJoin);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
} }
...@@ -1001,7 +1001,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren ...@@ -1001,7 +1001,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
planError("Invalid join algorithm:%d", pJoinLogicNode->joinAlgo); planError("Invalid join algorithm:%d", pJoinLogicNode->joinAlgo);
break; break;
} }
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1019,7 +1019,7 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh ...@@ -1019,7 +1019,7 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
pGrpCache->batchFetch = pLogicNode->batchFetch; pGrpCache->batchFetch = pLogicNode->batchFetch;
SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
/* /*
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pChildDesc->dataBlockId, -1, pLogicNode->pGroupCols, &pGrpCache->pGroupCols); code = setListSlotId(pCxt, pChildDesc->dataBlockId, -1, pLogicNode->pGroupCols, &pGrpCache->pGroupCols);
} }
...@@ -1045,7 +1045,7 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* ...@@ -1045,7 +1045,7 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList*
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
memcpy(pDynCtrl->stbJoin.srcScan, pLogicNode->stbJoin.srcScan, sizeof(pDynCtrl->stbJoin.srcScan)); memcpy(pDynCtrl->stbJoin.srcScan, pLogicNode->stbJoin.srcScan, sizeof(pDynCtrl->stbJoin.srcScan));
SNode* pNode = NULL; SNode* pNode = NULL;
int32_t i = 0; int32_t i = 0;
FOREACH(pNode, pVgList) { FOREACH(pNode, pVgList) {
...@@ -1062,7 +1062,7 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* ...@@ -1062,7 +1062,7 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList*
return code; return code;
} }
static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode, static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode,
SPhysiNode** pPhyNode) { SPhysiNode** pPhyNode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
......
...@@ -757,7 +757,6 @@ void metaHbToMnode(void* param, void* tmrId) { ...@@ -757,7 +757,6 @@ void metaHbToMnode(void* param, void* tmrId) {
SStreamHbMsg hbMsg = {0}; SStreamHbMsg hbMsg = {0};
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) { if (pMeta == NULL) {
// taosMemoryFree(param);
return; return;
} }
...@@ -779,6 +778,7 @@ void metaHbToMnode(void* param, void* tmrId) { ...@@ -779,6 +778,7 @@ void metaHbToMnode(void* param, void* tmrId) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SEpSet epset = {0}; SEpSet epset = {0};
bool hasValEpset = false;
hbMsg.vgId = pMeta->vgId; hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
...@@ -797,51 +797,53 @@ void metaHbToMnode(void* param, void* tmrId) { ...@@ -797,51 +797,53 @@ void metaHbToMnode(void* param, void* tmrId) {
if (i == 0) { if (i == 0) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset); epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasValEpset = true;
} }
} }
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
int32_t code = 0; if (hasValEpset) {
int32_t tlen = 0; int32_t code = 0;
int32_t tlen = 0;
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
if (code < 0) { if (code < 0) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
return; return;
} }
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
}
SEncoder encoder; void* buf = rpcMallocCont(tlen);
tEncoderInit(&encoder, buf, tlen); if (buf == NULL) {
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
rpcFreeCont(buf); taosArrayDestroy(hbMsg.pTaskStatus);
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); taosReleaseRef(streamMetaId, rid);
taosArrayDestroy(hbMsg.pTaskStatus); return;
taosReleaseRef(streamMetaId, rid); }
return;
}
tEncoderClear(&encoder);
taosArrayDestroy(hbMsg.pTaskStatus); SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf);
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0}; SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1; msg.info.noResp = 1;
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg);
}
tmsgSendReq(&epset, &msg); taosArrayDestroy(hbMsg.pTaskStatus);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
} }
...@@ -905,4 +907,4 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { ...@@ -905,4 +907,4 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
} }
\ No newline at end of file
...@@ -2156,12 +2156,10 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -2156,12 +2156,10 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
if (rpcDebugFlag & DEBUG_DEBUG) { if (rpcDebugFlag & DEBUG_DEBUG) {
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
char* tbuf = taosMemoryCalloc(1, TSDB_FQDN_LEN * 5); char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf); EPSET_TO_STR(&pCtx->epSet, tbuf);
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
pCtx->retryStep, pCtx->retryNextInterval); pCtx->retryStep, pCtx->retryNextInterval);
taosMemoryFree(tbuf);
} }
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
...@@ -2387,7 +2385,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -2387,7 +2385,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) { if (hasEpSet) {
if (rpcDebugFlag & DEBUG_TRACE) { if (rpcDebugFlag & DEBUG_TRACE) {
char tbuf[256] = {0}; char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf); EPSET_TO_STR(&pCtx->epSet, tbuf);
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
} }
......
...@@ -90,7 +90,7 @@ endi ...@@ -90,7 +90,7 @@ endi
sql select tags tbname,t,b from stt1 order by t sql select tags tbname,t,b from stt1 order by t
print $rows print $rows
print $data00 $data01 $data02 $data10 $data11 $data12 $data20 $data21 $data22 $data30 $data31 $data32 print $data00 $data01 $data02 $data10 $data11 $data12 $data20 $data21 $data22 $data30 $data31 $data32
if $rows != 4 then if $rows != 4 then
return -1 return -1
endi endi
...@@ -103,7 +103,7 @@ endi ...@@ -103,7 +103,7 @@ endi
sql select tags t,b from stt1 where t=1 sql select tags t,b from stt1 where t=1
print $rows print $rows
print $data00 $data01 print $data00 $data01
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
...@@ -116,7 +116,20 @@ endi ...@@ -116,7 +116,20 @@ endi
sql select tags t,b from stt1 where tbname='ctt11' sql select tags t,b from stt1 where tbname='ctt11'
print $rows print $rows
print $data00 $data01 print $data00 $data01
if $rows != 1 then
return -1
endi
if $data00 != @1@ then
return -1
endi
if $data01 != @1aa@ then
return -1
endi
sql select tags t,b from ctt11
print $rows
print $data00 $data01
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册