提交 47c891fb 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/main' into enh/TD-24586

......@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.0.4.3")
SET(TD_VER_NUMBER "3.0.5.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -265,7 +265,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_FALLOCATE "" OFF)
option(WITH_JEMALLOC "" OFF)
option(WITH_GFLAGS "" OFF)
option(PORTABLE "" OFF)
option(PORTABLE "" ON)
option(WITH_LIBURING "" OFF)
option(FAIL_ON_WARNINGS OFF)
......
......@@ -45,7 +45,7 @@ In TDengine, the data types below can be used when specifying a column or tag.
:::note
- Only ASCII visible characters are suggested to be used in a column or tag of BINARY type. Multi-byte characters must be stored in NCHAR type.
- The length of BINARY can be up to 16,374 bytes. The string value must be quoted with single quotes. You must specify a length in bytes for a BINARY value, for example binary(20) for up to twenty single-byte characters. If the data exceeds the specified length, an error will occur. The literal single quote inside the string must be preceded with back slash like `\'`
- The length of BINARY can be up to 16,374(data column is 65,517 and tag column is 16,382 since version 3.0.5.0) bytes. The string value must be quoted with single quotes. You must specify a length in bytes for a BINARY value, for example binary(20) for up to twenty single-byte characters. If the data exceeds the specified length, an error will occur. The literal single quote inside the string must be preceded with back slash like `\'`
- Numeric values in SQL statements will be determined as integer or float type according to whether there is decimal point or whether scientific notation is used, so attention must be paid to avoid overflow. For example, 9999999999999999999 will be considered as overflow because it exceeds the upper limit of long integer, but 9999999999999999999.0 will be considered as a legal float number.
:::
......
......@@ -45,7 +45,7 @@ table_option: {
1. The first column of a table MUST be of type TIMESTAMP. It is automatically set as the primary key.
2. The maximum length of the table name is 192 bytes.
3. The maximum length of each row is 48k bytes, please note that the extra 2 bytes used by each BINARY/NCHAR column are also counted.
3. The maximum length of each row is 48k(64k since version 3.0.5.0) bytes, please note that the extra 2 bytes used by each BINARY/NCHAR column are also counted.
4. The name of the subtable can only consist of characters from the English alphabet, digits and underscore. Table names can't start with a digit. Table names are case insensitive.
5. The maximum length in bytes must be specified when using BINARY or NCHAR types.
6. Escape character "\`" can be used to avoid the conflict between table names and reserved keywords, above rules will be bypassed when using escape character on table names, but the upper limit for the name length is still valid. The table names specified using escape character are case sensitive.
......
......@@ -26,7 +26,7 @@ The following characters cannot occur in a password: single quotation marks ('),
- Maximum length of database name is 64 bytes
- Maximum length of table name is 192 bytes, excluding the database name prefix and the separator.
- Maximum length of each data row is 48K bytes. Note that the upper limit includes the extra 2 bytes consumed by each column of BINARY/NCHAR type.
- Maximum length of each data row is 48K(64K since version 3.0.5.0) bytes. Note that the upper limit includes the extra 2 bytes consumed by each column of BINARY/NCHAR type.
- The maximum length of a column name is 64 bytes.
- Maximum number of columns is 4096. There must be at least 2 columns, and the first column must be timestamp.
- The maximum length of a tag name is 64 bytes
......
......@@ -1256,6 +1256,7 @@ The source code of the sample application is under `TDengine/examples/JDBC`:
- connectionPools: using taos-jdbcdriver in connection pools such as HikariCP, Druid, dbcp, c3p0, etc.
- SpringJdbcTemplate: using taos-jdbcdriver in Spring JdbcTemplate.
- mybatisplus-demo: using taos-jdbcdriver in Springboot + Mybatis.
- consumer-demo: consumer TDengine data example, the consumption rate can be controlled by parameters.
[JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC)
......
......@@ -90,7 +90,7 @@ You can configure smlChildTableName in taos.cfg to specify table names, for exam
Note: TDengine 3.0.3.0 and later automatically detect whether order is consistent. This parameter is no longer used.
:::tip
All processing logic of schemaless will still follow TDengine's underlying restrictions on data structures, such as the total length of each row of data cannot exceed 48 KB and the total length of a tag value cannot exceed 16 KB. See [TDengine SQL Boundary Limits](/taos-sql/limit) for specific constraints in this area.
All processing logic of schemaless will still follow TDengine's underlying restrictions on data structures, such as the total length of each row of data cannot exceed 48 KB(64 KB since version 3.0.5.0) and the total length of a tag value cannot exceed 16 KB. See [TDengine SQL Boundary Limits](/taos-sql/limit) for specific constraints in this area.
:::
## Time resolution recognition
......
......@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w
import Release from "/components/ReleaseV3";
## 3.0.5.0
<Release type="tdengine" version="3.0.5.0" />
## 3.0.4.2
<Release type="tdengine" version="3.0.4.2" />
......
......@@ -10,6 +10,10 @@ For other historical version installers, please visit [here](https://www.taosdat
import Release from "/components/ReleaseV3";
## 2.5.1
<Release type="tools" version="2.5.1" />
## 2.5.0
<Release type="tools" version="2.5.0" />
......
......@@ -1258,6 +1258,7 @@ public static void main(String[] args) throws Exception {
- connectionPools:HikariCP, Druid, dbcp, c3p0 等连接池中使用 taos-jdbcdriver。
- SpringJdbcTemplate:Spring JdbcTemplate 中使用 taos-jdbcdriver。
- mybatisplus-demo:Springboot + Mybatis 中使用 taos-jdbcdriver。
- consumer-demo:Consumer 消费 TDengine 数据示例,可通过参数控制消费速度。
请参考:[JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC)
......
......@@ -45,9 +45,9 @@ CREATE DATABASE db_name PRECISION 'ns';
:::note
- 表的每行长度不能超过 48KB(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)。
- 表的每行长度不能超过 48KB(从 3.0.5.0 版本开始为 64KB)(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)。
- 虽然 BINARY 类型在底层存储上支持字节型的二进制字符,但不同编程语言对二进制数据的处理方式并不保证一致,因此建议在 BINARY 类型中只存储 ASCII 可见字符,而避免存储不可见字符。多字节的数据,例如中文字符,则需要使用 NCHAR 类型进行保存。如果强行使用 BINARY 类型保存中文字符,虽然有时也能正常读写,但并不带有字符集信息,很容易出现数据乱码甚至数据损坏等情况。
- BINARY 类型理论上最长可以有 16,374 字节。BINARY 仅支持字符串输入,字符串两端需使用单引号引用。使用时须指定大小,如 BINARY(20) 定义了最长为 20 个单字节字符的字符串,每个字符占 1 字节的存储空间,总共固定占用 20 字节的空间,此时如果用户字符串超出 20 字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示,即 `\'`
- BINARY 类型理论上最长可以有 16,374(从 3.0.5.0 版本开始,数据列为 65,517,标签列为 16,382) 字节。BINARY 仅支持字符串输入,字符串两端需使用单引号引用。使用时须指定大小,如 BINARY(20) 定义了最长为 20 个单字节字符的字符串,每个字符占 1 字节的存储空间,总共固定占用 20 字节的空间,此时如果用户字符串超出 20 字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示,即 `\'`
- SQL 语句中的数值类型将依据是否存在小数点,或使用科学计数法表示,来判断数值类型是否为整型或者浮点型,因此在使用时要注意相应类型越界的情况。例如,9999999999999999999 会认为超过长整型的上边界而溢出,而 9999999999999999999.0 会被认为是有效的浮点数。
:::
......
......@@ -43,7 +43,7 @@ table_option: {
1. 表的第一个字段必须是 TIMESTAMP,并且系统自动将其设为主键;
2. 表名最大长度为 192;
3. 表的每行长度不能超过 48KB;(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)
3. 表的每行长度不能超过 48KB(从 3.0.5.0 版本开始为 64KB);(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)
4. 子表名只能由字母、数字和下划线组成,且不能以数字开头,不区分大小写
5. 使用数据类型 binary 或 nchar,需指定其最长的字节数,如 binary(20),表示 20 字节;
6. 为了兼容支持更多形式的表名,TDengine 引入新的转义符 "\`",可以让表名与关键词不冲突,同时不受限于上述表名称合法性约束检查。但是同样具有长度限制要求。使用转义字符以后,不再对转义字符中的内容进行大小写统一。
......
......@@ -26,7 +26,7 @@ description: 合法字符集和命名中的限制规则
- 数据库名最大长度为 64 字节
- 表名最大长度为 192 字节,不包括数据库名前缀和分隔符
- 每行数据最大长度 48KB (注意:数据行内每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)
- 每行数据最大长度 48KB(从 3.0.5.0 版本开始为 64KB) (注意:数据行内每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)
- 列名最大长度为 64 字节
- 最多允许 4096 列,最少需要 2 列,第一列必须是时间戳。
- 标签名最大长度为 64 字节
......
......@@ -87,7 +87,7 @@ st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000
:::tip
无模式所有的处理逻辑,仍会遵循 TDengine 对数据结构的底层限制,例如每行数据的总长度不能超过
48KB,标签值的总长度不超过16KB。这方面的具体限制约束请参见 [TDengine SQL 边界限制](/taos-sql/limit)
48KB(从 3.0.5.0 版本开始为 64KB),标签值的总长度不超过16KB。这方面的具体限制约束请参见 [TDengine SQL 边界限制](/taos-sql/limit)
:::
......
......@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
## 3.0.5.0
<Release type="tdengine" version="3.0.5.0" />
## 3.0.4.2
<Release type="tdengine" version="3.0.4.2" />
......
......@@ -10,6 +10,10 @@ taosTools 各版本安装包下载链接如下:
import Release from "/components/ReleaseV3";
## 2.5.1
<Release type="tools" version="2.5.1" />
## 2.5.0
<Release type="tools" version="2.5.0" />
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata</groupId>
<artifactId>consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>ConsumerDemo</id>
<configuration>
<finalName>ConsumerDemo</finalName>
<archive>
<manifest>
<mainClass>com.taosdata.ConsumerDemo</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
# How to Run the Consumer Demo Code On Linux OS
TDengine's Consumer demo project is organized in a Maven way so that users can easily compile, package and run the project. If you don't have Maven on your server, you may install it using
```
sudo apt-get install maven
```
## Install TDengine Client and TaosAdapter
Make sure you have already installed a tdengine client on your current develop environment.
Download the tdengine package on our website: ``https://www.taosdata.com/cn/all-downloads/`` and install the client.
## Run Consumer Demo using mvn plugin
run command:
```
mvn clean compile exec:java -Dexec.mainClass="com.taosdata.ConsumerDemo"
```
## Custom configuration
```shell
# the host of TDengine server
export TAOS_HOST="127.0.0.1"
# the port of TDengine server
export TAOS_PORT="6041"
# the consumer type, can be "ws" or "jni"
export TAOS_TYPE="ws"
# the number of consumers
export TAOS_JDBC_CONSUMER_NUM="1"
# the number of processors to consume
export TAOS_JDBC_PROCESSOR_NUM="2"
# the number of records to be consumed per processor per second
export TAOS_JDBC_RATE_PER_PROCESSOR="1000"
# poll wait time in ms
export TAOS_JDBC_POLL_SLEEP="100"
```
## Run Consumer Demo using jar
To compile the demo project, go to the source directory ``TDengine/tests/examples/JDBC/consumer-demo`` and execute
```
mvn clean package assembly:single
```
To run ConsumerDemo.jar, go to ``TDengine/tests/examples/JDBC/consumer-demo`` and execute
```
java -jar target/ConsumerDemo-jar-with-dependencies.jar
```
package com.taosdata;
import java.sql.Timestamp;
public class Bean {
private Timestamp ts;
private Integer c1;
private String c2;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public Integer getC1() {
return c1;
}
public void setC1(Integer c1) {
this.c1 = c1;
}
public String getC2() {
return c2;
}
public void setC2(String c2) {
this.c2 = c2;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Bean {");
sb.append("ts=").append(ts);
sb.append(", c1=").append(c1);
sb.append(", c2='").append(c2).append('\'');
sb.append('}');
return sb.toString();
}
}
package com.taosdata;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class BeanDeserializer extends ReferenceDeserializer<Bean> {
}
package com.taosdata;
public class Config {
public static final String TOPIC = "test_consumer";
public static final String TAOS_HOST = "127.0.0.1";
public static final String TAOS_PORT = "6041";
public static final String TAOS_TYPE = "ws";
public static final int TAOS_JDBC_CONSUMER_NUM = 1;
public static final int TAOS_JDBC_PROCESSOR_NUM = 2;
public static final int TAOS_JDBC_RATE_PER_PROCESSOR = 1000;
public static final int TAOS_JDBC_POLL_SLEEP = 100;
private final int consumerNum;
private final int processCapacity;
private final int rate;
private final int pollSleep;
private final String type;
private final String host;
private final String port;
public Config(String type, String host, String port, int consumerNum, int processCapacity, int rate, int pollSleep) {
this.type = type;
this.consumerNum = consumerNum;
this.processCapacity = processCapacity;
this.rate = rate;
this.pollSleep = pollSleep;
this.host = host;
this.port = port;
}
public int getConsumerNum() {
return consumerNum;
}
public int getProcessCapacity() {
return processCapacity;
}
public int getRate() {
return rate;
}
public int getPollSleep() {
return pollSleep;
}
public String getHost() {
return host;
}
public String getPort() {
return port;
}
public String getType() {
return type;
}
public static Config getFromENV() {
String host = System.getenv("TAOS_HOST") != null ? System.getenv("TAOS_HOST") : TAOS_HOST;
String port = System.getenv("TAOS_PORT") != null ? System.getenv("TAOS_PORT") : TAOS_PORT;
String type = System.getenv("TAOS_TYPE") != null ? System.getenv("TAOS_TYPE") : TAOS_TYPE;
String c = System.getenv("TAOS_JDBC_CONSUMER_NUM");
int num = c != null ? Integer.parseInt(c) : TAOS_JDBC_CONSUMER_NUM;
String p = System.getenv("TAOS_JDBC_PROCESSOR_NUM");
int capacity = p != null ? Integer.parseInt(p) : TAOS_JDBC_PROCESSOR_NUM;
String r = System.getenv("TAOS_JDBC_RATE_PER_PROCESSOR");
int rate = r != null ? Integer.parseInt(r) : TAOS_JDBC_RATE_PER_PROCESSOR;
String s = System.getenv("TAOS_JDBC_POLL_SLEEP");
int sleep = s != null ? Integer.parseInt(s) : TAOS_JDBC_POLL_SLEEP;
return new Config(type, host, port, num, capacity, rate, sleep);
}
}
package com.taosdata;
import com.taosdata.jdbc.tmq.TMQConstants;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static com.taosdata.Config.*;
public class ConsumerDemo {
public static void main(String[] args) throws SQLException {
// Config
Config config = Config.getFromENV();
// Generated data
mockData();
Properties prop = new Properties();
prop.setProperty(TMQConstants.CONNECT_TYPE, config.getType());
prop.setProperty(TMQConstants.BOOTSTRAP_SERVERS, config.getHost() + ":" + config.getPort());
prop.setProperty(TMQConstants.CONNECT_USER, "root");
prop.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
prop.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
prop.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
prop.setProperty(TMQConstants.GROUP_ID, "gId");
prop.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.BeanDeserializer");
for (int i = 0; i < config.getConsumerNum() - 1; i++) {
new Thread(new Worker(prop, config)).start();
}
new Worker(prop, config).run();
}
public static void mockData() throws SQLException {
String dbName = "test_consumer";
String tableName = "st";
String url = "jdbc:TAOS-RS://" + TAOS_HOST + ":" + TAOS_PORT + "/?user=root&password=taosdata&batchfetch=true";
Connection connection = DriverManager.getConnection(url);
Statement statement = connection.createStatement();
statement.executeUpdate("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650");
statement.executeUpdate("use " + dbName);
statement.executeUpdate("create table if not exists " + tableName + " (ts timestamp, c1 int, c2 nchar(100)) ");
statement.executeUpdate("create topic if not exists " + TOPIC + " as select ts, c1, c2 from " + tableName);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("mock-data-thread-" + t.getId());
return t;
});
AtomicInteger atomic = new AtomicInteger();
scheduledExecutorService.scheduleWithFixedDelay(() -> {
int i = atomic.getAndIncrement();
try {
statement.executeUpdate("insert into " + tableName + " values(now, " + i + ",'" + i + "')");
} catch (SQLException e) {
// ignore
}
}, 0, 10, TimeUnit.MILLISECONDS);
}
}
package com.taosdata;
import com.google.common.util.concurrent.RateLimiter;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Semaphore;
public class Worker implements Runnable {
int sleepTime;
int rate;
ForkJoinPool pool = new ForkJoinPool();
Semaphore semaphore;
TaosConsumer<Bean> consumer;
public Worker(Properties prop, Config config) throws SQLException {
consumer = new TaosConsumer<>(prop);
consumer.subscribe(Collections.singletonList(Config.TOPIC));
semaphore = new Semaphore(config.getProcessCapacity());
sleepTime = config.getPollSleep();
rate = config.getRate();
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
// 控制请求频率
if (semaphore.tryAcquire()) {
ConsumerRecords<Bean> records = consumer.poll(Duration.ofMillis(sleepTime));
pool.submit(() -> {
RateLimiter limiter = RateLimiter.create(rate);
try {
for (ConsumerRecord<Bean> record : records) {
// 流量控制
limiter.acquire();
// 业务处理数据
System.out.println("[" + LocalDateTime.now() + "] Thread id:" + Thread.currentThread().getId() + " -> " + record.value());
}
} finally {
semaphore.release();
}
});
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
......@@ -163,6 +163,7 @@ typedef struct {
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
int64_t streamBackendRid;
} SStreamState;
typedef struct SFunctionStateStore {
......
......@@ -344,7 +344,6 @@ typedef struct SStreamMeta {
SRWLatch lock;
int32_t walScanCounter;
void* streamBackend;
int32_t streamBackendId;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
} SStreamMeta;
......
......@@ -22,21 +22,20 @@ extern "C" {
// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following sectio
// #if !defined(WINDOWS)
#if !defined(WINDOWS)
// #ifndef ALLOW_FORBID_FUNC
// #define malloc MALLOC_FUNC_TAOS_FORBID
// #define calloc CALLOC_FUNC_TAOS_FORBID
// #define realloc REALLOC_FUNC_TAOS_FORBID
// #define free FREE_FUNC_TAOS_FORBID
// #ifdef strdup
// #undef strdup
// #define strdup STRDUP_FUNC_TAOS_FORBID
// #endif
// #endif // ifndef ALLOW_FORBID_FUNC
// #endif // if !defined(WINDOWS)
#ifndef ALLOW_FORBID_FUNC
#define malloc MALLOC_FUNC_TAOS_FORBID
#define calloc CALLOC_FUNC_TAOS_FORBID
#define realloc REALLOC_FUNC_TAOS_FORBID
#define free FREE_FUNC_TAOS_FORBID
#ifdef strdup
#undef strdup
#define strdup STRDUP_FUNC_TAOS_FORBID
#endif
#endif // ifndef ALLOW_FORBID_FUNC
#endif // if !defined(WINDOWS)
// // #define taosMemoryFree malloc
// #define taosMemoryMalloc malloc
// #define taosMemoryCalloc calloc
// #define taosMemoryRealloc realloc
......
......@@ -691,6 +691,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
.colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)};
if (!pLastCol) {
pLastCol = &noneCol;
reallocVarData(&pLastCol->colVal);
}
taosArraySet(pLastArray, idxKey->idx, pLastCol);
......@@ -2848,14 +2849,16 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
if (pColVal->value.nData > 0) {
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
}
if (!COL_VAL_IS_VALUE(pColVal)) {
......@@ -3016,14 +3019,16 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
if (pColVal->value.nData > 0) {
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
}
/*if (COL_VAL_IS_NONE(pColVal)) {
......
......@@ -457,6 +457,7 @@ typedef struct SStreamIntervalOperatorInfo {
int64_t dataVersion;
SStateStore statestore;
bool recvGetAll;
SHashObj* pFinalPullDataMap;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
......
......@@ -1865,7 +1865,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
} else {
pInfo->pUpdateInfo->maxDataVersion = pTaskInfo->streamInfo.fillHistoryVer2;
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer2);
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
}
}
......
......@@ -1306,6 +1306,8 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
return true;
}
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
......@@ -1340,8 +1342,14 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (chIds) {
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
continue;
int32_t childId = getChildIndex(pBlock);
SArray* chArray = *(void**)chIds;
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
if (index != -1) {
qDebug("===stream===try push delete window%" PRId64 "chId:%d ,continue", win.skey, childId);
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
continue;
}
}
bool res = doDeleteWindow(pOperator, win.skey, winGpId);
if (pUpWins && res) {
......@@ -1497,6 +1505,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
taosArrayDestroy(*(void**)pIte);
}
taosHashCleanup(pInfo->pPullDataMap);
taosHashCleanup(pInfo->pFinalPullDataMap);
taosArrayDestroy(pInfo->pPullWins);
blockDataDestroy(pInfo->pPullDataRes);
taosArrayDestroy(pInfo->pDelWins);
......@@ -2067,8 +2076,6 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*));
}
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) {
tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
......@@ -2112,7 +2119,7 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
blockDataUpdateTsWindow(pBlock, 0);
}
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
......@@ -2136,6 +2143,22 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval)
taosArrayDestroy(chArray);
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts);
void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey));
if (pFinalCh) {
taosHashRemove(pFinalMap, &winRes, sizeof(SWinKey));
doDeleteWindow(pOperator, winRes.ts, winRes.groupId);
STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval);
SPullWindowInfo pull = {.window = nextWin,
.groupId = winRes.groupId,
.calWin.skey = nextWin.skey,
.calWin.ekey = nextWin.skey};
// add pull data request
if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
addPullWindow(pMap, &winRes, numOfCh);
qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh);
}
}
}
}
}
......@@ -2144,7 +2167,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval)
}
}
static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, int32_t childId) {
int32_t size = taosArrayGetSize(wins);
for (int32_t i = 0; i < size; i++) {
SWinKey* winKey = taosArrayGet(wins, i);
......@@ -2161,6 +2184,14 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
addPullWindow(pInfo->pPullDataMap, winKey, pInfo->numOfChild);
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, pInfo->numOfChild);
}
} else {
SArray* chArray = *(void**)chIds;
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
qDebug("===stream===check final retrive %" PRId64",chid:%d", winKey->ts, index);
if (index == -1) {
qDebug("===stream===add final retrive %" PRId64, winKey->ts);
taosHashPut(pInfo->pFinalPullDataMap, winKey, sizeof(SWinKey), NULL, 0);
}
}
}
}
......@@ -2554,7 +2585,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
if (IS_FINAL_OP(pInfo)) {
addRetriveWindow(delWins, pInfo);
int32_t chId = getChildIndex(pBlock);
addRetriveWindow(delWins, pInfo, chId);
if (pBlock->info.type != STREAM_CLEAR) {
taosArrayAddAll(pInfo->pDelWins, delWins);
}
......@@ -2589,7 +2621,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
continue;
} else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, pInfo->numOfChild, pOperator);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
......@@ -2772,6 +2804,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pullIndex = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pFinalPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
......@@ -4963,6 +4996,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pPhyNode = NULL; // create new child
pInfo->pPullDataMap = NULL;
pInfo->pFinalPullDataMap = NULL;
pInfo->pPullWins = NULL; // SPullWindowInfo
pInfo->pullIndex = 0;
pInfo->pPullDataRes = NULL;
......
......@@ -36,8 +36,9 @@ static SStreamGlobalEnv streamEnv;
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
......@@ -53,6 +54,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
extern int32_t streamBackendId;
#ifdef __cplusplus
}
#endif
......
......@@ -16,7 +16,9 @@
#include "streamBackendRocksdb.h"
#include "executor.h"
#include "query.h"
#include "streamInc.h"
#include "tcommon.h"
#include "tref.h"
typedef struct SCompactFilteFactory {
void* status;
......@@ -79,8 +81,8 @@ const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name);
void* streamBackendInit(const char* path) {
qDebug("init stream backend");
SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle));
qDebug("start to init stream backend at %s", path);
SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle));
pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL);
taosThreadMutexInit(&pHandle->cfMutex, NULL);
......@@ -119,6 +121,7 @@ void* streamBackendInit(const char* path) {
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", path, err);
taosMemoryFreeClear(err);
goto _EXIT;
}
} else {
/*
......@@ -129,6 +132,7 @@ void* streamBackendInit(const char* path) {
if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf);
}
qDebug("succ to init stream backend at %s, backend:%p", path, pHandle);
return (void*)pHandle;
_EXIT:
......@@ -140,7 +144,8 @@ _EXIT:
taosHashCleanup(pHandle->cfInst);
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list);
free(pHandle);
taosMemoryFree(pHandle);
qDebug("failed to init stream backend at %s", path);
return NULL;
}
void streamBackendCleanup(void* arg) {
......@@ -168,19 +173,20 @@ void streamBackendCleanup(void* arg) {
rocksdb_env_destroy(pHandle->env);
rocksdb_cache_destroy(pHandle->cache);
taosThreadMutexDestroy(&pHandle->mutex);
SListNode* head = tdListPopHead(pHandle->list);
while (head != NULL) {
streamStateDestroyCompar(head->data);
taosMemoryFree(head);
head = tdListPopHead(pHandle->list);
}
// rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list);
taosThreadMutexDestroy(&pHandle->mutex);
taosThreadMutexDestroy(&pHandle->cfMutex);
taosMemoryFree(pHandle);
qDebug("destroy stream backend backend:%p", pHandle);
return;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) {
......@@ -803,7 +809,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
return 0;
}
int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendHandle* handle = backend;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
......@@ -866,7 +873,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId);
return 0;
}
......@@ -882,8 +889,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadMutexUnlock(&pHandle->cfMutex);
char* status[] = {"close", "drop"};
qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId,
pState->taskId);
qInfo("start to close %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle,
pState->streamId, pState->taskId);
if (pState->pTdbState->rocksdb == NULL) {
return;
}
......@@ -938,6 +945,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadRwlockDestroy(&pState->pTdbState->rwLock);
pState->pTdbState->rocksdb = NULL;
taosReleaseRef(streamBackendId, pState->streamBackendRid);
}
void streamStateDestroyCompar(void* arg) {
SCfComparator* comp = (SCfComparator*)arg;
......
......@@ -20,7 +20,7 @@
#include "ttimer.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
static int32_t streamBackendId = 0;
int32_t streamBackendId = 0;
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
......@@ -79,7 +79,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->vgId = vgId;
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
pMeta->streamBackendId = streamBackendId;
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "state");
......@@ -90,6 +89,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
}
pMeta->streamBackend = streamBackendInit(streamPath);
if (pMeta->streamBackend == NULL) {
goto _err;
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
taosMemoryFree(streamPath);
......
......@@ -106,7 +106,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
}
SStreamTask* pStreamTask = pTask;
char statePath[1024];
char statePath[1024];
if (!specPath) {
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
} else {
......@@ -119,10 +119,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
#ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta;
taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid);
pState->streamBackendRid = pMeta->streamBackendRid;
int code = streamStateOpenBackend(pMeta->streamBackend, pState);
if (code == -1) {
taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid);
taosReleaseRef(streamBackendId, pMeta->streamBackendRid);
taosMemoryFree(pState);
pState = NULL;
}
......@@ -222,9 +222,7 @@ _err:
void streamStateClose(SStreamState* pState, bool remove) {
SStreamTask* pTask = pState->pTdbState->pOwner;
#ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
streamStateDestroy(pState, remove);
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
#else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
......@@ -278,10 +276,10 @@ int32_t streamStateCommit(SStreamState* pState) {
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
return code;
......@@ -291,10 +289,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
}
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize;
return code;
......
......@@ -419,7 +419,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
if (code != 0 || len == 0 || val == NULL) {
return TSDB_CODE_FAILED;
}
memcpy(val, buf, len);
memcpy(buf, val, len);
buf[len] = 0;
maxCheckPointId = atol((char*)buf);
taosMemoryFree(val);
......@@ -433,7 +433,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
if (code != 0) {
return TSDB_CODE_FAILED;
}
memcpy(val, buf, len);
memcpy(buf, val, len);
buf[len] = 0;
taosMemoryFree(val);
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2
sql drop stream if exists stream_t1;
......@@ -248,10 +223,56 @@ sql insert into ts3 values(1648791223002,2,2,3,1.1);
sql insert into ts4 values(1648791233003,3,2,3,2.1);
sql insert into ts3 values(1648791243004,4,2,43,73.1);
sql insert into ts4 values(1648791213002,24,22,23,4.1);
$loop_count = 0
loop032:
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sleep 1000
print 6-0 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop032
endi
if $data01 != 8 then
print =6====data01=$data01
goto loop032
endi
sql insert into ts3 values(1648791243005,4,20,3,3.1);
sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
$loop_count = 0
loop033:
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sleep 1000
print 6-1 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop033
endi
if $data01 != 8 then
print =6====data01=$data01
goto loop033
endi
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
......
......@@ -116,7 +116,7 @@ class TDTestCase:
tdSql.checkRows(1000)
tdLog.info("================= step3")
tdSql.execute('drop database test')
for i in range(50):
for i in range(10):
tdSql.execute("create database test%d duration 1" %(i))
tdSql.execute("use test%d" %(i))
tdSql.execute("create table tb (ts timestamp,i int)")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册