提交 961107d1 编写于 作者: X Xiaoyu Wang

merge main

......@@ -222,7 +222,7 @@ A database including one supertable and two subtables is created as follows:
```sql
DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
......
```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
\ No newline at end of file
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
</TabItem>
<TabItem value="ws" label="WebSocket connection">
```java
{{#include docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
</TabItem>
</Tabs>
......@@ -696,6 +696,9 @@ TaosConsumer consumer = new TaosConsumer<>(config);
- enable.auto.commit: Specifies whether to commit automatically.
- group.id: consumer: Specifies the group that the consumer is in.
- value.deserializer: To deserialize the results, you can inherit `com.taosdata.jdbc.tmq.ReferenceDeserializer` and specify the result set bean. You can also inherit `com.taosdata.jdbc.tmq.Deserializer` and perform custom deserialization based on the SQL result set.
- td.connect.type: Specifies the type connect with TDengine, `jni` or `WebSocket`. default is `jni`
- httpConnectTimeout:WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type.
- messageWaitTimeout:socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.
- For more information, see [Consumer Parameters](../../../develop/tmq).
#### Subscribe to consume data
......@@ -724,6 +727,11 @@ For more information, see [Data Subscription](../../../develop/tmq).
### Usage examples
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
In addition to the native connection, the Java Connector also supports subscribing via websocket.
```java
public abstract class ConsumerLoop {
private final TaosConsumer<ResultBean> consumer;
......@@ -795,6 +803,87 @@ public abstract class ConsumerLoop {
}
```
</TabItem>
<TabItem value="ws" label="WebSocket connection">
```java
public abstract class ConsumerLoop {
private final TaosConsumer<ResultBean> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public ConsumerLoop() throws SQLException {
Properties config = new Properties();
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("td.connect.type", "ws");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("group.id", "group2");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
this.consumer = new TaosConsumer<>(config);
this.topics = Collections.singletonList("topic_speed");
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ResultBean result);
public void pollData() throws SQLException {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ResultBean record : records) {
process(record);
}
}
consumer.unsubscribe();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
public static class ResultBean {
private Timestamp ts;
private int speed;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public int getSpeed() {
return speed;
}
public void setSpeed(int speed) {
this.speed = speed;
}
}
}
```
</TabItem>
</Tabs>
> **Note**: The value of value.deserializer should be adjusted based on the package path of the test environment.
### Use with connection pool
#### HikariCP
......@@ -878,8 +967,8 @@ The source code of the sample application is under `TDengine/examples/JDBC`:
| taos-jdbcdriver version | major changes |
| :---------------------: | :--------------------------------------------: |
| 3.0.3 | fix timestamp resolution error for REST connection in jdk17+ version |
| 3.0.1 - 3.0.2 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use 3.0.2 in the JDK 8 environment |
| 3.1.0 | JDBC REST connection supports subscription over WebSocket |
| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment |
| 3.0.0 | Support for TDengine 3.0 |
| 2.0.42 | fix wasNull interface return value in WebSocket connection |
| 2.0.41 | fix decode method of username and password in REST connection |
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
......@@ -17,13 +18,13 @@
</properties>
<dependencies>
<!-- ANCHOR: dep-->
<!-- ANCHOR: dep-->
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.0.0</version>
<version>3.1.0</version>
</dependency>
<!-- ANCHOR_END: dep-->
<!-- ANCHOR_END: dep-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......@@ -32,4 +33,4 @@
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
package com.taos.example;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
public class WebsocketSubscribeDemo {
private static final String TOPIC = "tmq_topic_ws";
private static final String DB_NAME = "meters_ws";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// prepare
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://127.0.0.1:6041/?user=root&password=taosdata&batchfetch=true";
try (Connection connection = DriverManager.getConnection(jdbcUrl);
Statement statement = connection.createStatement()) {
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME);
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}
// create consumer
Properties properties = new Properties();
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041");
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.GROUP_ID, "test");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taos.example.MetersDeserializer");
// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
timer.cancel();
}
}
......@@ -64,21 +64,15 @@ public class TestAll {
@Test
public void testSubscribe() {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
insertData();
} catch (SQLException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
SubscribeDemo.main(args);
}
@Test
public void testSubscribeOverWebsocket() {
WebsocketSubscribeDemo.main(args);
}
@Test
public void testSchemaless() throws SQLException {
LineProtocolExample.main(args);
......
<Tabs defaultValue="native">
<TabItem value="native" label="本地连接">
```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
```
......@@ -6,4 +8,17 @@
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
\ No newline at end of file
```
</TabItem>
<TabItem value="ws" label="WebSocket 连接">
```java
{{#include docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
</TabItem>
</Tabs>
......@@ -699,7 +699,10 @@ TaosConsumer consumer = new TaosConsumer<>(config);
- enable.auto.commit: 是否允许自动提交。
- group.id: consumer: 所在的 group。
- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean,实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
- 其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
- td.connect.type: 连接方式。jni:表示使用动态库连接的方式,ws/WebSocket:表示使用 WebSocket 进行数据通信。默认为 jni 方式。
- httpConnectTimeout:创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。
- messageWaitTimeout:数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。
其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
#### 订阅消费数据
......@@ -727,6 +730,9 @@ consumer.close()
### 使用示例如下:
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
```java
public abstract class ConsumerLoop {
private final TaosConsumer<ResultBean> consumer;
......@@ -798,6 +804,89 @@ public abstract class ConsumerLoop {
}
```
</TabItem>
<TabItem value="ws" label="WebSocket 连接">
除了原生的连接方式,Java 连接器还支持通过 WebSocket 订阅数据。
```java
public abstract class ConsumerLoop {
private final TaosConsumer<ResultBean> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public ConsumerLoop() throws SQLException {
Properties config = new Properties();
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("td.connect.type", "ws");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("group.id", "group2");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
this.consumer = new TaosConsumer<>(config);
this.topics = Collections.singletonList("topic_speed");
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ResultBean result);
public void pollData() throws SQLException {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ResultBean record : records) {
process(record);
}
}
consumer.unsubscribe();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
public static class ResultBean {
private Timestamp ts;
private int speed;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public int getSpeed() {
return speed;
}
public void setSpeed(int speed) {
this.speed = speed;
}
}
}
```
</TabItem>
</Tabs>
> **注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。
### 与连接池使用
#### HikariCP
......@@ -881,8 +970,8 @@ public static void main(String[] args) throws Exception {
| taos-jdbcdriver 版本 | 主要变化 |
| :------------------: | :----------------------------: |
| 3.0.3 | 修复 REST 连接在 jdk17+ 版本时间戳解析错误问题 |
| 3.0.1 - 3.0.2 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用 3.0.2 版本 |
| 3.1.0 | WebSocket 连接支持订阅功能 |
| 3.0.1 - 3.0.4 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用其他版本 |
| 3.0.0 | 支持 TDengine 3.0 |
| 2.0.42 | 修在 WebSocket 连接中 wasNull 接口返回值 |
| 2.0.41 | 修正 REST 连接中用户名和密码转码方式 |
......
......@@ -205,8 +205,7 @@ typedef struct SDataBlockInfo {
STimeWindow calWin; // used for stream, do not serialize
TSKEY watermark; // used for stream
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
STag* pTag; // used for stream partition
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
} SDataBlockInfo;
typedef struct SSDataBlock {
......@@ -292,7 +291,6 @@ typedef struct STableBlockDistInfo {
uint16_t numOfFiles;
uint32_t numOfTables;
uint32_t numOfBlocks;
uint32_t numOfVgroups;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
......@@ -302,6 +300,7 @@ typedef struct STableBlockDistInfo {
int32_t firstSeekTimeUs;
uint32_t numOfInmemRows;
uint32_t numOfSmallBlocks;
uint32_t numOfVgroups;
int32_t blockRowsHisto[20];
} STableBlockDistInfo;
......
......@@ -2850,7 +2850,7 @@ typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
char idxName[TSDB_COL_NAME_LEN];
char idxName[TSDB_INDEX_FNAME_LEN];
int8_t idxType;
} SCreateTagIndexReq;
......
FROM ubuntu:18.04
WORKDIR /root
ARG pkgFile
ARG dirName
ARG cpuType
RUN echo ${pkgFile} && echo ${dirName}
RUN apt update
RUN apt install -y curl
COPY ${pkgFile} /root/
ENV TINI_VERSION v0.19.0
ENV TAOS_DISABLE_ADAPTER 1
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini
ENV DEBIAN_FRONTEND=noninteractive
WORKDIR /root/
RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \
LC_CTYPE=en_US.UTF-8 \
LANG=en_US.UTF-8 \
LC_ALL=en_US.UTF-8
COPY ./run.sh /usr/bin/
COPY ./bin/* /usr/bin/
ENTRYPOINT ["/tini", "--", "/usr/bin/entrypoint.sh"]
CMD ["bash", "-c", "/usr/bin/run.sh"]
VOLUME [ "/var/lib/taos", "/var/log/taos" ]
......@@ -4684,6 +4684,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->numOfVgroups = 1;
// find the start data block in file
tsdbAcquireReader(pReader);
if (pReader->suspended) {
tsdbReaderResume(pReader);
}
SReaderStatus* pStatus = &pReader->status;
STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
......@@ -4745,7 +4750,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
}
tsdbReleaseReader(pReader);
return code;
}
......
......@@ -230,12 +230,19 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_MEDIUMBLOB:
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
ASSERT(0);
qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
terrno = TSDB_CODE_APP_ERROR;
goto _end;
break;
default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
if (colDataIsNull_s(pColInfoData, j)) {
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
qError("NULL value for primary key");
terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
goto _end;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv);
} else {
......@@ -256,7 +263,8 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}
} else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
ASSERT(0);
terrno = TSDB_CODE_APP_ERROR;
goto _end;
}
break;
}
......@@ -296,7 +304,7 @@ _end:
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
return TSDB_CODE_FAILED;
return terrno;
}
*ppReq = pReq;
return TSDB_CODE_SUCCESS;
......
......@@ -5423,7 +5423,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
pDistInfo->numOfBlocks += p1.numOfBlocks;
pDistInfo->numOfTables += p1.numOfTables;
pDistInfo->numOfInmemRows += p1.numOfInmemRows;
pDistInfo->numOfVgroups += p1.numOfVgroups;
pDistInfo->totalSize += p1.totalSize;
pDistInfo->totalRows += p1.totalRows;
pDistInfo->numOfFiles += p1.numOfFiles;
......@@ -5441,6 +5440,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
}
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
}
......@@ -5459,7 +5459,6 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->totalRows) < 0) return -1;
......@@ -5492,7 +5491,6 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->totalRows) < 0) return -1;
......
......@@ -3120,9 +3120,8 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
p[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
p[i] = colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL);
if (p[i] == 0) {
all = false;
} else {
......@@ -3146,9 +3145,8 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
p[i] = !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL);
if (p[i] == 0) {
all = false;
} else {
......@@ -3178,13 +3176,13 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData *pRe
for (int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData *pData = info->cunits[0].colData;
void *colData = colDataGetData(pData, i);
if (colData == NULL || colDataIsNull_s(pData, i)) {
if (colDataIsNull_s(pData, i)) {
all = false;
p[i] = 0;
continue;
}
void *colData = colDataGetData(pData, i);
p[i] = (*rfunc)(colData, colData, valData, valData2, func);
if (p[i] == 0) {
......@@ -3210,13 +3208,13 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
if (colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
p[i] = 0;
all = false;
continue;
}
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
// match/nmatch for nchar type need convert from ucs4 to mbs
if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR &&
(info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)) {
......@@ -3274,7 +3272,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SC
if (!isNull) {
colData = colDataGetData((SColumnInfoData *)(cunit->colData), i);
}
if (colData == NULL || isNull) {
p[i] = optr == OP_TYPE_IS_NULL ? true : false;
} else {
......
......@@ -89,6 +89,8 @@ typedef struct dirent TdDirEntry;
#endif
#define TDDIRMAXLEN 1024
void taosRemoveDir(const char *dirname) {
TdDirPtr pDir = taosOpenDir(dirname);
if (pDir == NULL) return;
......@@ -133,8 +135,8 @@ int32_t taosMkDir(const char *dirname) {
}
int32_t taosMulMkDir(const char *dirname) {
if (dirname == NULL) return -1;
char temp[1024];
if (dirname == NULL || strlen(dirname) >= TDDIRMAXLEN) return -1;
char temp[TDDIRMAXLEN];
char *pos = temp;
int32_t code = 0;
#ifdef WINDOWS
......@@ -192,8 +194,8 @@ int32_t taosMulMkDir(const char *dirname) {
}
int32_t taosMulModeMkDir(const char *dirname, int mode) {
if (dirname == NULL) return -1;
char temp[1024];
if (dirname == NULL || strlen(dirname) >= TDDIRMAXLEN) return -1;
char temp[TDDIRMAXLEN];
char *pos = temp;
int32_t code = 0;
#ifdef WINDOWS
......@@ -204,8 +206,7 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) {
#endif
if (taosDirExist(temp)) {
chmod(temp, mode);
return code;
return chmod(temp, mode);
}
if (strncmp(temp, TD_DIRSEP, 1) == 0) {
......@@ -247,12 +248,10 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) {
}
if (code < 0 && errno == EEXIST) {
chmod(temp, mode);
return 0;
return chmod(temp, mode);
}
chmod(temp, mode);
return code;
return chmod(temp, mode);
}
void taosRemoveOldFiles(const char *dirname, int32_t keepDays) {
......
......@@ -132,15 +132,20 @@ int64_t taosCopyFile(const char *from, const char *to) {
if (bytes < sizeof(buffer)) break;
}
taosFsyncFile(pFileTo);
int code = taosFsyncFile(pFileTo);
taosCloseFile(&pFileFrom);
taosCloseFile(&pFileTo);
if (code != 0) {
return -1;
}
return size;
_err:
if (pFileFrom != NULL) taosCloseFile(&pFileFrom);
if (pFileTo != NULL) taosCloseFile(&pFileTo);
/* coverity[+retval] */
taosRemoveFile(to);
return -1;
#endif
......@@ -506,13 +511,13 @@ int64_t taosPWriteFile(TdFilePtr pFile, const void *buf, int64_t count, int64_t
}
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
if (pFile == NULL || pFile->fd < 0) {
return -1;
}
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
ASSERT(pFile->fd >= 0); // Please check if you have closed the file.
if (pFile->fd < 0) {
return -1;
}
#ifdef WINDOWS
int64_t ret = _lseeki64(pFile->fd, offset, whence);
#else
......
......@@ -745,8 +745,10 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
#endif
serverAdd.sin_port = (uint16_t)htons(port);
if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
// printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0) { // exception
return false;
} else if (fd <= 2) { // in, out, err
taosCloseSocketNoCheck1(fd);
return false;
}
......
......@@ -439,11 +439,14 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
if (code != 0 && (done & 1) == 0) {
TdFilePtr pFile1 = taosOpenFile("/proc/device-tree/model", TD_FILE_READ | TD_FILE_STREAM);
if (pFile1 == NULL) return code;
taosGetsFile(pFile1, maxLen, cpuModel);
taosCloseFile(&pFile1);
code = 0;
done |= 1;
if (pFile1 != NULL) {
ssize_t bytes = taosGetsFile(pFile1, maxLen, cpuModel);
taosCloseFile(&pFile);
if (bytes > 0) {
code = 0;
done |= 1;
}
}
}
if (code != 0 && (done & 1) == 0) {
......@@ -498,7 +501,7 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
curSysTotal = curSysUsed + sysCpu.idle;
curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime;
if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) {
if (curSysTotal - lastSysTotal > 0 && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) {
if (cpu_system != NULL) {
*cpu_system = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100;
}
......@@ -610,12 +613,6 @@ int32_t taosGetProcMemory(int64_t *usedKB) {
}
}
if (strlen(line) < 0) {
// printf("read file:%s failed", tsProcMemFile);
taosCloseFile(&pFile);
return -1;
}
char tmp[10];
sscanf(line, "%s %" PRId64, tmp, usedKB);
......
......@@ -909,7 +909,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
char buf[4096] = {0};
char *tz = NULL;
{
int n = readlink("/etc/localtime", buf, sizeof(buf));
int n = readlink("/etc/localtime", buf, sizeof(buf)-1);
if (n < 0) {
printf("read /etc/localtime error, reason:%s", strerror(errno));
......
......@@ -55,7 +55,7 @@ python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l`
# /home/chr/TDengine/source/libs/scalar/src/filter.c:3149:14: runtime error: applying non-zero offset 18446744073709551615 to null pointer
# /home/TDinternal/community/source/libs/scalar/src/sclvector.c:1109:66: runtime error: signed integer overflow: 9223372034707292160 + 1676867897049 cannot be represented in type 'long int'
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |grep -v "filter.c:3149:14" |wc -l`
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |wc -l`
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
......
......@@ -31,7 +31,8 @@ class TDTestCase:
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'showMsg': 1,
'showRow': 1}
'showRow': 1,
'snapshot': 1}
cdbName = 'cdb'
# some parameter to consumer processor
......@@ -42,7 +43,7 @@ class TDTestCase:
ifManualCommit = 1
groupId = 'group.id:cgrp1'
autoCommit = 'enable.auto.commit:true'
autoCommitInterval = 'auto.commit.interval.ms:1000'
autoCommitInterval = 'auto.commit.interval.ms:100'
autoOffset = 'auto.offset.reset:earliest'
pollDelay = 20
......@@ -86,7 +87,7 @@ class TDTestCase:
tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName,0,0,self.paraDict["snapshot"])
tdLog.info("After waiting for a commit notify, drop one stable")
#time.sleep(3)
......
......@@ -151,7 +151,7 @@ class TMQCom:
if tdSql.getData(i, 1) == 0:
loopFlag = 0
break
time.sleep(0.1)
time.sleep(0.02)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb',rows=2):
......@@ -165,7 +165,7 @@ class TMQCom:
if tdSql.getData(i, 1) == 1:
loopFlag = 0
break
time.sleep(0.1)
time.sleep(0.02)
return
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
......
......@@ -845,6 +845,8 @@ void shellReadHistory() {
i = (i + SHELL_MAX_HISTORY_SIZE - 1) % SHELL_MAX_HISTORY_SIZE;
}
taosFprintfFile(pFile, "%s\n", pHistory->hist[endIndex]);
/* coverity[+retval] */
taosFsyncFile(pFile);
taosCloseFile(&pFile);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册