提交 36d6b1e5 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/3.0_interval_hash_optimize

此差异已折叠。
...@@ -114,7 +114,9 @@ The above process can be repeated to add more dnodes in the cluster. ...@@ -114,7 +114,9 @@ The above process can be repeated to add more dnodes in the cluster.
Any node that is in the cluster and online can be the firstEp of new nodes. Any node that is in the cluster and online can be the firstEp of new nodes.
Nodes use the firstEp parameter only when joining a cluster for the first time. After a node has joined the cluster, it stores the latest mnode in its end point list and no longer makes use of firstEp. Nodes use the firstEp parameter only when joining a cluster for the first time. After a node has joined the cluster, it stores the latest mnode in its end point list and no longer makes use of firstEp.
However, firstEp is used by clients that connect to the cluster. For example, if you run `taos shell` without arguments, it connects to the firstEp by default.
However, firstEp is used by clients that connect to the cluster. For example, if you run TDengine CLI `taos` without arguments, it connects to the firstEp by default.
Two dnodes that are launched without a firstEp value operate independently of each other. It is not possible to add one dnode to the other dnode and form a cluster. It is also not possible to form two independent clusters into a new cluster. Two dnodes that are launched without a firstEp value operate independently of each other. It is not possible to add one dnode to the other dnode and form a cluster. It is also not possible to form two independent clusters into a new cluster.
::: :::
......
...@@ -57,7 +57,7 @@ table_option: { ...@@ -57,7 +57,7 @@ table_option: {
3. MAX_DELAY: specifies the maximum latency for pushing computation results. The default value is 15 minutes or the value of the INTERVAL parameter, whichever is smaller. Enter a value between 0 and 15 minutes in milliseconds, seconds, or minutes. You can enter multiple values separated by commas (,). Note: Retain the default value if possible. Configuring a small MAX_DELAY may cause results to be frequently pushed, affecting storage and query performance. This parameter applies only to supertables and takes effect only when the RETENTIONS parameter has been specified for the database. 3. MAX_DELAY: specifies the maximum latency for pushing computation results. The default value is 15 minutes or the value of the INTERVAL parameter, whichever is smaller. Enter a value between 0 and 15 minutes in milliseconds, seconds, or minutes. You can enter multiple values separated by commas (,). Note: Retain the default value if possible. Configuring a small MAX_DELAY may cause results to be frequently pushed, affecting storage and query performance. This parameter applies only to supertables and takes effect only when the RETENTIONS parameter has been specified for the database.
4. ROLLUP: specifies aggregate functions to roll up. Rolling up a function provides downsampled results based on multiple axes. This parameter applies only to supertables and takes effect only when the RETENTIONS parameter has been specified for the database. You can specify only one function to roll up. The rollup takes effect on all columns except TS. Enter one of the following values: avg, sum, min, max, last, or first. 4. ROLLUP: specifies aggregate functions to roll up. Rolling up a function provides downsampled results based on multiple axes. This parameter applies only to supertables and takes effect only when the RETENTIONS parameter has been specified for the database. You can specify only one function to roll up. The rollup takes effect on all columns except TS. Enter one of the following values: avg, sum, min, max, last, or first.
5. SMA: specifies functions on which to enable small materialized aggregates (SMA). SMA is user-defined precomputation of aggregates based on data blocks. Enter one of the following values: max, min, or sum This parameter can be used with supertables and standard tables. 5. SMA: specifies functions on which to enable small materialized aggregates (SMA). SMA is user-defined precomputation of aggregates based on data blocks. Enter one of the following values: max, min, or sum This parameter can be used with supertables and standard tables.
6. TTL: specifies the time to live (TTL) for the table. If the period specified by the TTL parameter elapses without any data being written to the table, TDengine will automatically delete the table. Note: The system may not delete the table at the exact moment that the TTL expires. Enter a value in days. The default value is 0. Note: The TTL parameter has a higher priority than the KEEP parameter. If a table is marked for deletion because the TTL has expired, it will be deleted even if the time specified by the KEEP parameter has not elapsed. This parameter can be used with standard tables and subtables. 6. TTL: specifies the time to live (TTL) for the table. If TTL is specified when creatinga table, after the time period for which the table has been existing is over TTL, TDengine will automatically delete the table. Please be noted that the system may not delete the table at the exact moment that the TTL expires but guarantee there is such a system and finally the table will be deleted. The unit of TTL is in days. The default value is 0, i.e. never expire.
## Create Subtables ## Create Subtables
......
...@@ -1139,7 +1139,7 @@ SELECT STATECOUNT(field_name, oper, val) FROM { tb_name | stb_name } [WHERE clau ...@@ -1139,7 +1139,7 @@ SELECT STATECOUNT(field_name, oper, val) FROM { tb_name | stb_name } [WHERE clau
**Applicable parameter values**: **Applicable parameter values**:
- oper : Can be one of `LT` (lower than), `GT` (greater than), `LE` (lower than or equal to), `GE` (greater than or equal to), `NE` (not equal to), `EQ` (equal to), the value is case insensitive - oper : Can be one of `'LT'` (lower than), `'GT'` (greater than), `'LE'` (lower than or equal to), `'GE'` (greater than or equal to), `'NE'` (not equal to), `'EQ'` (equal to), the value is case insensitive, the value must be in quotes.
- val : Numeric types - val : Numeric types
**Return value type**: Integer **Return value type**: Integer
...@@ -1166,7 +1166,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W ...@@ -1166,7 +1166,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W
**Applicable parameter values**: **Applicable parameter values**:
- oper : Can be one of `LT` (lower than), `GT` (greater than), `LE` (lower than or equal to), `GE` (greater than or equal to), `NE` (not equal to), `EQ` (equal to), the value is case insensitive - oper : Can be one of `'LT'` (lower than), `'GT'` (greater than), `'LE'` (lower than or equal to), `'GE'` (greater than or equal to), `'NE'` (not equal to), `'EQ'` (equal to), the value is case insensitive, the value must be in quotes.
- val : Numeric types - val : Numeric types
- unit: The unit of time interval. Enter one of the following options: 1b (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), or 1w (weeks) If you do not enter a unit of time, the precision of the current database is used by default. - unit: The unit of time interval. Enter one of the following options: 1b (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), or 1w (weeks) If you do not enter a unit of time, the precision of the current database is used by default.
......
...@@ -27,4 +27,4 @@ The number of dnodes in a TDengine cluster must NOT be lower than the number of ...@@ -27,4 +27,4 @@ The number of dnodes in a TDengine cluster must NOT be lower than the number of
As long as the dnodes of a TDengine cluster are deployed on different physical machines and the replica number is higher than 1, high availability can be achieved without any other assistance. For disaster recovery, dnodes of a TDengine cluster should be deployed in geographically different data centers. As long as the dnodes of a TDengine cluster are deployed on different physical machines and the replica number is higher than 1, high availability can be achieved without any other assistance. For disaster recovery, dnodes of a TDengine cluster should be deployed in geographically different data centers.
Alternatively, you can use taosX to synchronize the data from one TDengine cluster to another cluster in a remote location. For more information, see [taosX](../../reference/taosX). Alternatively, you can use taosX to synchronize the data from one TDengine cluster to another cluster in a remote location. However, taosX is only available in TDengine enterprise version, for more information please contact tdengine.com.
...@@ -72,7 +72,7 @@ Next, ensure the hostname "tdengine" is resolvable in `/etc/hosts`. ...@@ -72,7 +72,7 @@ Next, ensure the hostname "tdengine" is resolvable in `/etc/hosts`.
echo 127.0.0.1 tdengine |sudo tee -a /etc/hosts echo 127.0.0.1 tdengine |sudo tee -a /etc/hosts
``` ```
Finally, the TDengine service can be accessed from the taos shell or any connector with "tdengine" as the server address. Finally, the TDengine service can be accessed from the TDengine CLI or any connector with "tdengine" as the server address.
```shell ```shell
taos -h tdengine -P 6030 taos -h tdengine -P 6030
......
...@@ -344,7 +344,7 @@ The charset that takes effect is UTF-8. ...@@ -344,7 +344,7 @@ The charset that takes effect is UTF-8.
| Attribute | Description | | Attribute | Description |
| -------- | --------------------------------- | | -------- | --------------------------------- |
| Applicable | Server and Client | | Applicable | Server and Client |
| Meaning | The interval for taos shell to send heartbeat to mnode | | Meaning | The interval for TDengine CLI to send heartbeat to mnode |
| Unit | second | | Unit | second |
| Value Range | 1-120 | | Value Range | 1-120 |
| Default Value | 3 | | Default Value | 3 |
......
...@@ -16,14 +16,14 @@ public class RestInsertExample { ...@@ -16,14 +16,14 @@ public class RestInsertExample {
private static List<String> getRawData() { private static List<String> getRawData() {
return Arrays.asList( return Arrays.asList(
"d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,California.SanFrancisco,2", "d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,'California.SanFrancisco',2",
"d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,California.SanFrancisco,2", "d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,'California.SanFrancisco',2",
"d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,California.SanFrancisco,2", "d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,'California.SanFrancisco',2",
"d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,California.SanFrancisco,3", "d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,'California.SanFrancisco',3",
"d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,California.LosAngeles,2", "d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,'California.LosAngeles',2",
"d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,California.LosAngeles,2", "d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,'California.LosAngeles',2",
"d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,California.LosAngeles,3", "d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,'California.LosAngeles',3",
"d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,California.LosAngeles,3" "d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,'California.LosAngeles',3"
); );
} }
......
...@@ -57,7 +57,7 @@ public class SubscribeDemo { ...@@ -57,7 +57,7 @@ public class SubscribeDemo {
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true"); properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.GROUP_ID, "test"); properties.setProperty(TMQConstants.GROUP_ID, "test");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taosdata.jdbc.MetersDeserializer"); "com.taos.example.MetersDeserializer");
// poll data // poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) { try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
......
package com.taos.example.highvolume;
import java.sql.*;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
public class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public Long count() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
result.next();
return result.getLong(1);
}
return null;
}
/**
* show test.stables;
*
* name | created_time | columns | tags | tables |
* ============================================================================================
* meters | 2022-07-20 08:39:30.902 | 4 | 2 | 620000 |
*/
public Long getTableCount() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("show test.stables");
result.next();
return result.getLong(5);
}
return null;
}
}
\ No newline at end of file
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static int taskQueueCapacity = 1000000;
final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>();
final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();
public static void stopAll() {
logger.info("shutting down");
readTasks.forEach(task -> task.stop());
writeTasks.forEach(task -> task.stop());
databaseMonitor.close();
}
public static void main(String[] args) throws InterruptedException, SQLException {
int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 3;
int tableCount = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int maxBatchSize = args.length > 3 ? Integer.parseInt(args[3]) : 3000;
logger.info("readTaskCount={}, writeTaskCount={} tableCount={} maxBatchSize={}",
readTaskCount, writeTaskCount, tableCount, maxBatchSize);
databaseMonitor.init().prepareDatabase();
// Create task queues, whiting tasks and start writing threads.
for (int i = 0; i < writeTaskCount; ++i) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity);
taskQueues.add(queue);
WriteTask task = new WriteTask(queue, maxBatchSize);
Thread t = new Thread(task);
t.setName("WriteThread-" + i);
t.start();
}
// create reading tasks and start reading threads
int tableCountPerTask = tableCount / readTaskCount;
for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(i, taskQueues, tableCountPerTask);
Thread t = new Thread(task);
t.setName("ReadThread-" + i);
t.start();
}
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));
long lastCount = 0;
while (true) {
Thread.sleep(10000);
long numberOfTable = databaseMonitor.getTableCount();
long count = databaseMonitor.count();
logger.info("numberOfTable={} count={} speed={}", numberOfTable, count, (count - lastCount) / 10);
lastCount = count;
}
}
}
\ No newline at end of file
package com.taos.example.highvolume;
import java.util.Iterator;
/**
* Generate test data
*/
class MockDataSource implements Iterator {
private String tbNamePrefix;
private int tableCount;
private long maxRowsPerTable = 1000000000L;
// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
// mock values
String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix, int tableCount) {
this.tbNamePrefix = tbNamePrefix;
this.tableCount = tableCount;
}
@Override
public boolean hasNext() {
currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
}
@Override
public String next() {
long ts = startMs + 100 * currentRow;
int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(location[currentRow % 5]).append(','); // location
sb.append(groupId); // groupID
return sb.toString();
}
}
\ No newline at end of file
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId;
private final List<BlockingQueue<String>> taskQueues;
private final int queueCount;
private final int tableCount;
private boolean active = true;
public ReadTask(int readTaskId, List<BlockingQueue<String>> queues, int tableCount) {
this.taskId = readTaskId;
this.taskQueues = queues;
this.queueCount = queues.size();
this.tableCount = tableCount;
}
/**
* Assign data received to different queues.
* Here we use the suffix number in table name.
* You are expected to define your own rule in practice.
*
* @param line record received
* @return which queue to use
*/
public int getQueueId(String line) {
String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101
String suffixNumber = tbName.split("_")[1];
return Integer.parseInt(suffixNumber) % this.queueCount;
}
@Override
public void run() {
logger.info("started");
Iterator<String> it = new MockDataSource("tb" + this.taskId, tableCount);
try {
while (it.hasNext() && active) {
String line = it.next();
int queueId = getQueueId(line);
taskQueues.get(queueId).put(line);
}
} catch (Exception e) {
logger.error("Read Task Error", e);
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}
\ No newline at end of file
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
/**
* A helper class encapsulate the logic of writing using SQL.
* <p>
* The main interfaces are two methods:
* <ol>
* <li>{@link SQLWriter#processLine}, which receive raw lines from WriteTask and group them by table names.</li>
* <li>{@link SQLWriter#flush}, which assemble INSERT statement and execute it.</li>
* </ol>
* <p>
* There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb".
* This ensure that checking table existence is a one-time-only operation.
* </p>
*
* </p>
*/
public class SQLWriter {
final static Logger logger = LoggerFactory.getLogger(SQLWriter.class);
private Connection conn;
private Statement stmt;
/**
* current number of buffered records
*/
private int bufferedCount = 0;
/**
* Maximum number of buffered records.
* Flush action will be triggered if bufferedCount reached this value,
*/
private int maxBatchSize;
/**
* Maximum SQL length.
*/
private int maxSQLLength;
/**
* Map from table name to column values. For example:
* "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)"
*/
private Map<String, String> tbValues = new HashMap<>();
/**
* Map from table name to tag values in the same order as creating stable.
* Used for creating table.
*/
private Map<String, String> tbTags = new HashMap<>();
public SQLWriter(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
/**
* Get Database Connection
*
* @return Connection
* @throws SQLException
*/
private static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
return DriverManager.getConnection(jdbcURL);
}
/**
* Create Connection and Statement
*
* @throws SQLException
*/
public void init() throws SQLException {
conn = getConnection();
stmt = conn.createStatement();
stmt.execute("use test");
ResultSet rs = stmt.executeQuery("show variables");
while (rs.next()) {
String configName = rs.getString(1);
if ("maxSQLLength".equals(configName)) {
maxSQLLength = Integer.parseInt(rs.getString(2));
logger.info("maxSQLLength={}", maxSQLLength);
}
}
}
/**
* Convert raw data to SQL fragments, group them by table name and cache them in a HashMap.
* Trigger writing when number of buffered records reached maxBachSize.
*
* @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId
*/
public void processLine(String line) throws SQLException {
bufferedCount += 1;
int firstComma = line.indexOf(',');
String tbName = line.substring(0, firstComma);
int lastComma = line.lastIndexOf(',');
int secondLastComma = line.lastIndexOf(',', lastComma - 1);
String value = "(" + line.substring(firstComma + 1, secondLastComma) + ") ";
if (tbValues.containsKey(tbName)) {
tbValues.put(tbName, tbValues.get(tbName) + value);
} else {
tbValues.put(tbName, value);
}
if (!tbTags.containsKey(tbName)) {
String location = line.substring(secondLastComma + 1, lastComma);
String groupId = line.substring(lastComma + 1);
String tagValues = "('" + location + "'," + groupId + ')';
tbTags.put(tbName, tagValues);
}
if (bufferedCount == maxBatchSize) {
flush();
}
}
/**
* Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it.
* In case of "Table does not exit" exception, create all tables in the sql and retry the sql.
*/
public void flush() throws SQLException {
StringBuilder sb = new StringBuilder("INSERT INTO ");
for (Map.Entry<String, String> entry : tbValues.entrySet()) {
String tableName = entry.getKey();
String values = entry.getValue();
String q = tableName + " values " + values + " ";
if (sb.length() + q.length() > maxSQLLength) {
executeSQL(sb.toString());
logger.warn("increase maxSQLLength or decrease maxBatchSize to gain better performance");
sb = new StringBuilder("INSERT INTO ");
}
sb.append(q);
}
executeSQL(sb.toString());
tbValues.clear();
bufferedCount = 0;
}
private void executeSQL(String sql) throws SQLException {
try {
stmt.executeUpdate(sql);
} catch (SQLException e) {
// convert to error code defined in taoserror.h
int errorCode = e.getErrorCode() & 0xffff;
if (errorCode == 0x362 || errorCode == 0x218) {
// Table does not exist
createTables();
executeSQL(sql);
} else {
logger.error("Execute SQL: {}", sql);
throw e;
}
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
}
/**
* Create tables in batch using syntax:
* <p>
* CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
* </p>
*/
private void createTables() throws SQLException {
StringBuilder sb = new StringBuilder("CREATE TABLE ");
for (String tbName : tbValues.keySet()) {
String tagValues = tbTags.get(tbName);
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
}
String sql = sb.toString();
try {
stmt.executeUpdate(sql);
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
}
public boolean hasBufferedValues() {
return bufferedCount > 0;
}
public int getBufferedCount() {
return bufferedCount;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
}
\ No newline at end of file
package com.taos.example.highvolume;
public class StmtWriter {
}
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
class WriteTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final int maxBatchSize;
// the queue from which this writing task get raw data.
private final BlockingQueue<String> queue;
// A flag indicate whether to continue.
private boolean active = true;
public WriteTask(BlockingQueue<String> taskQueue, int maxBatchSize) {
this.queue = taskQueue;
this.maxBatchSize = maxBatchSize;
}
@Override
public void run() {
logger.info("started");
String line = null; // data getting from the queue just now.
SQLWriter writer = new SQLWriter(maxBatchSize);
try {
writer.init();
while (active) {
line = queue.poll();
if (line != null) {
// parse raw data and buffer the data.
writer.processLine(line);
} else if (writer.hasBufferedValues()) {
// write data immediately if no more data in the queue
writer.flush();
} else {
// sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, .
Thread.sleep(100);
}
}
if (writer.hasBufferedValues()) {
writer.flush();
}
} catch (Exception e) {
String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount());
logger.error(msg, e);
} finally {
writer.close();
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}
\ No newline at end of file
...@@ -23,16 +23,16 @@ public class TestAll { ...@@ -23,16 +23,16 @@ public class TestAll {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
try (Connection conn = DriverManager.getConnection(jdbcUrl)) { try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
try (Statement stmt = conn.createStatement()) { try (Statement stmt = conn.createStatement()) {
String sql = "INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000)\n" + String sql = "INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000)\n" +
" power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 15:38:15.000',12.60000,218,0.33000)\n" + " power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 15:38:15.000',12.60000,218,0.33000)\n" +
" power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 15:38:16.800',12.30000,221,0.31000)\n" + " power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 15:38:16.800',12.30000,221,0.31000)\n" +
" power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 15:38:16.650',10.30000,218,0.25000)\n" + " power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 15:38:16.650',10.30000,218,0.25000)\n" +
" power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 15:38:05.500',11.80000,221,0.28000)\n" + " power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 15:38:05.500',11.80000,221,0.28000)\n" +
" power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 15:38:16.600',13.40000,223,0.29000)\n" + " power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 15:38:16.600',13.40000,223,0.29000)\n" +
" power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 15:38:05.000',10.80000,223,0.29000)\n" + " power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 15:38:05.000',10.80000,223,0.29000)\n" +
" power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 15:38:06.000',10.80000,223,0.29000)\n" + " power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 15:38:06.000',10.80000,223,0.29000)\n" +
" power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 15:38:07.000',10.80000,223,0.29000)\n" + " power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 15:38:07.000',10.80000,223,0.29000)\n" +
" power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 15:38:08.500',11.50000,221,0.35000)"; " power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 15:38:08.500',11.50000,221,0.35000)";
stmt.execute(sql); stmt.execute(sql);
} }
......
# install dependencies:
# recommend python >= 3.8
# pip3 install faster-fifo
#
import logging
import math
import sys
import time
import os
from multiprocessing import Process
from faster_fifo import Queue
from mockdatasource import MockDataSource
from queue import Empty
from typing import List
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
READ_TASK_COUNT = 1
WRITE_TASK_COUNT = 1
TABLE_COUNT = 1000
QUEUE_SIZE = 1000000
MAX_BATCH_SIZE = 3000
read_processes = []
write_processes = []
def get_connection():
"""
If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
"""
import taos
firstEP = os.environ.get("TDENGINE_FIRST_EP")
if firstEP:
host, port = firstEP.split(":")
else:
host, port = None, 0
user = os.environ.get("TDENGINE_USER", "root")
password = os.environ.get("TDENGINE_PASSWORD", "taosdata")
return taos.connect(host=host, port=int(port), user=user, password=password)
# ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
try:
for batch in data_source:
for table_id, rows in batch:
# hash data to different queue
i = table_id % len(task_queues)
# block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1)
except KeyboardInterrupt:
pass
# ANCHOR_END: read
# ANCHOR: write
def run_write_task(task_id: int, queue: Queue):
from sql_writer import SQLWriter
log = logging.getLogger(f"WriteTask-{task_id}")
writer = SQLWriter(get_connection)
lines = None
try:
while True:
try:
# get as many as possible
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
writer.process_lines(lines)
except Empty:
time.sleep(0.01)
except KeyboardInterrupt:
pass
except BaseException as e:
log.debug(f"lines={lines}")
raise e
# ANCHOR_END: write
def set_global_config():
argc = len(sys.argv)
if argc > 1:
global READ_TASK_COUNT
READ_TASK_COUNT = int(sys.argv[1])
if argc > 2:
global WRITE_TASK_COUNT
WRITE_TASK_COUNT = int(sys.argv[2])
if argc > 3:
global TABLE_COUNT
TABLE_COUNT = int(sys.argv[3])
if argc > 4:
global QUEUE_SIZE
QUEUE_SIZE = int(sys.argv[4])
if argc > 5:
global MAX_BATCH_SIZE
MAX_BATCH_SIZE = int(sys.argv[5])
# ANCHOR: monitor
def run_monitor_process():
log = logging.getLogger("DataBaseMonitor")
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
def get_count():
res = conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all()
return rows[0][0] if rows else 0
last_count = 0
while True:
time.sleep(10)
count = get_count()
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
# ANCHOR_END: monitor
# ANCHOR: main
def main():
set_global_config()
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
monitor_process = Process(target=run_monitor_process)
monitor_process.start()
time.sleep(3) # waiting for database ready.
task_queues: List[Queue] = []
# create task queues
for i in range(WRITE_TASK_COUNT):
queue = Queue(max_size_bytes=QUEUE_SIZE)
task_queues.append(queue)
# create write processes
for i in range(WRITE_TASK_COUNT):
p = Process(target=run_write_task, args=(i, task_queues[i]))
p.start()
logging.debug(f"WriteTask-{i} started with pid {p.pid}")
write_processes.append(p)
# create read processes
for i in range(READ_TASK_COUNT):
queues = assign_queues(i, task_queues)
p = Process(target=run_read_task, args=(i, queues))
p.start()
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes.append(p)
try:
monitor_process.join()
except KeyboardInterrupt:
monitor_process.terminate()
[p.terminate() for p in read_processes]
[p.terminate() for p in write_processes]
[q.close() for q in task_queues]
def assign_queues(read_task_id, task_queues):
"""
Compute target queues for a specific read task.
"""
ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
from_index = math.floor(read_task_id * ratio)
end_index = math.ceil((read_task_id + 1) * ratio)
return task_queues[from_index:end_index]
if __name__ == '__main__':
main()
# ANCHOR_END: main
import time
class MockDataSource:
samples = [
"8.8,119,0.32,LosAngeles,0",
"10.7,116,0.34,SanDiego,1",
"9.9,111,0.33,Hollywood,2",
"8.9,113,0.329,Compton,3",
"9.4,118,0.141,San Francisco,4"
]
def __init__(self, tb_name_prefix, table_count):
self.table_name_prefix = tb_name_prefix + "_"
self.table_count = table_count
self.max_rows = 10000000
self.current_ts = round(time.time() * 1000) - self.max_rows * 100
# [(tableId, tableName, values),]
self.data = self._init_data()
def _init_data(self):
lines = self.samples * (self.table_count // 5 + 1)
data = []
for i in range(self.table_count):
table_name = self.table_name_prefix + str(i)
data.append((i, table_name, lines[i])) # tableId, row
return data
def __iter__(self):
self.row = 0
return self
def __next__(self):
"""
next 1000 rows for each table.
return: {tableId:[row,...]}
"""
# generate 1000 timestamps
ts = []
for _ in range(1000):
self.current_ts += 100
ts.append(str(self.current_ts))
# add timestamp to each row
# [(tableId, ["tableName,ts,current,voltage,phase,location,groupId"])]
result = []
for table_id, table_name, values in self.data:
rows = [table_name + ',' + t + ',' + values for t in ts]
result.append((table_id, rows))
return result
import logging
import taos
class SQLWriter:
log = logging.getLogger("SQLWriter")
def __init__(self, get_connection_func):
self._tb_values = {}
self._tb_tags = {}
self._conn = get_connection_func()
self._max_sql_length = self.get_max_sql_length()
self._conn.execute("USE test")
def get_max_sql_length(self):
rows = self._conn.query("SHOW variables").fetch_all()
for r in rows:
name = r[0]
if name == "maxSQLLength":
return int(r[1])
return 1024 * 1024
def process_lines(self, lines: str):
"""
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
"""
for line in lines:
ps = line.split(",")
table_name = ps[0]
value = '(' + ",".join(ps[1:-2]) + ') '
if table_name in self._tb_values:
self._tb_values[table_name] += value
else:
self._tb_values[table_name] = value
if table_name not in self._tb_tags:
location = ps[-2]
group_id = ps[-1]
tag_value = f"('{location}',{group_id})"
self._tb_tags[table_name] = tag_value
self.flush()
def flush(self):
"""
Assemble INSERT statement and execute it.
When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created.
In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed.
"""
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
for tb_name, values in self._tb_values.items():
q = tb_name + " VALUES " + values
if sql_len + len(q) >= self._max_sql_length:
sql += " ".join(buf)
self.execute_sql(sql)
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
buf.append(q)
sql_len += len(q)
sql += " ".join(buf)
self.execute_sql(sql)
self._tb_values.clear()
def execute_sql(self, sql):
try:
self._conn.execute(sql)
except taos.Error as e:
error_code = e.errno & 0xffff
# Table does not exit
if error_code == 9731:
self.create_tables()
else:
self.log.error("Execute SQL: %s", sql)
raise e
except BaseException as baseException:
self.log.error("Execute SQL: %s", sql)
raise baseException
def create_tables(self):
sql = "CREATE TABLE "
for tb in self._tb_values.keys():
tag_values = self._tb_tags[tb]
sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
try:
self._conn.execute(sql)
except BaseException as e:
self.log.error("Execute SQL: %s", sql)
raise e
...@@ -23,7 +23,7 @@ import PhpStmt from "./_php_stmt.mdx"; ...@@ -23,7 +23,7 @@ import PhpStmt from "./_php_stmt.mdx";
## SQL 写入简介 ## SQL 写入简介
应用通过连接器执行 INSERT 语句来插入数据,用户还可以通过 TAOS Shell,手动输入 INSERT 语句插入数据。 应用通过连接器执行 INSERT 语句来插入数据,用户还可以通过 TDengine CLI,手动输入 INSERT 语句插入数据。
### 一次写入一条 ### 一次写入一条
下面这条 INSERT 就将一条记录写入到表 d1001 中: 下面这条 INSERT 就将一条记录写入到表 d1001 中:
......
此差异已折叠。
...@@ -52,7 +52,7 @@ Query OK, 2 row(s) in set (0.001100s) ...@@ -52,7 +52,7 @@ Query OK, 2 row(s) in set (0.001100s)
### 示例一 ### 示例一
在 TAOS Shell,查找加利福尼亚州所有智能电表采集的电压平均值,并按照 location 分组。 在 TDengine CLI,查找加利福尼亚州所有智能电表采集的电压平均值,并按照 location 分组。
``` ```
taos> SELECT AVG(voltage), location FROM meters GROUP BY location; taos> SELECT AVG(voltage), location FROM meters GROUP BY location;
...@@ -65,7 +65,7 @@ Query OK, 2 rows in database (0.005995s) ...@@ -65,7 +65,7 @@ Query OK, 2 rows in database (0.005995s)
### 示例二 ### 示例二
在 TAOS shell, 查找 groupId 为 2 的所有智能电表的记录条数,电流的最大值。 在 TDengine CLI, 查找 groupId 为 2 的所有智能电表的记录条数,电流的最大值。
``` ```
taos> SELECT count(*), max(current) FROM meters where groupId = 2; taos> SELECT count(*), max(current) FROM meters where groupId = 2;
......
...@@ -71,7 +71,7 @@ serverPort 6030 ...@@ -71,7 +71,7 @@ serverPort 6030
## 启动集群 ## 启动集群
按照《立即开始》里的步骤,启动第一个数据节点,例如 h1.taosdata.com,然后执行 taos,启动 taos shell,从 shell 里执行命令“SHOW DNODES”,如下所示: 按照《立即开始》里的步骤,启动第一个数据节点,例如 h1.taosdata.com,然后执行 taos,启动 TDengine CLI,在其中执行命令 “SHOW DNODES”,如下所示:
``` ```
taos> show dnodes; taos> show dnodes;
...@@ -115,7 +115,7 @@ SHOW DNODES; ...@@ -115,7 +115,7 @@ SHOW DNODES;
任何已经加入集群在线的数据节点,都可以作为后续待加入节点的 firstEp。 任何已经加入集群在线的数据节点,都可以作为后续待加入节点的 firstEp。
firstEp 这个参数仅仅在该数据节点首次加入集群时有作用,加入集群后,该数据节点会保存最新的 mnode 的 End Point 列表,不再依赖这个参数。 firstEp 这个参数仅仅在该数据节点首次加入集群时有作用,加入集群后,该数据节点会保存最新的 mnode 的 End Point 列表,不再依赖这个参数。
接下来,配置文件中的 firstEp 参数就主要在客户端连接的时候使用了,例如 taos shell 如果不加参数,会默认连接由 firstEp 指定的节点。 接下来,配置文件中的 firstEp 参数就主要在客户端连接的时候使用了,例如 TDengine CLI 如果不加参数,会默认连接由 firstEp 指定的节点。
两个没有配置 firstEp 参数的数据节点 dnode 启动后,会独立运行起来。这个时候,无法将其中一个数据节点加入到另外一个数据节点,形成集群。无法将两个独立的集群合并成为新的集群。 两个没有配置 firstEp 参数的数据节点 dnode 启动后,会独立运行起来。这个时候,无法将其中一个数据节点加入到另外一个数据节点,形成集群。无法将两个独立的集群合并成为新的集群。
::: :::
......
...@@ -366,7 +366,7 @@ kubectl scale statefulsets tdengine --replicas=1 ...@@ -366,7 +366,7 @@ kubectl scale statefulsets tdengine --replicas=1
``` ```
taos shell 中的所有数据库操作将无法成功。 TDengine CLI 中的所有数据库操作将无法成功。
``` ```
taos> show dnodes; taos> show dnodes;
......
...@@ -10,27 +10,27 @@ description: 对表的各种管理操作 ...@@ -10,27 +10,27 @@ description: 对表的各种管理操作
```sql ```sql
CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definitionn] ...) [table_options] CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definitionn] ...) [table_options]
CREATE TABLE create_subtable_clause CREATE TABLE create_subtable_clause
CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definitionn] ...) CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definitionn] ...)
[TAGS (create_definition [, create_definitionn] ...)] [TAGS (create_definition [, create_definitionn] ...)]
[table_options] [table_options]
create_subtable_clause: { create_subtable_clause: {
create_subtable_clause [create_subtable_clause] ... create_subtable_clause [create_subtable_clause] ...
| [IF NOT EXISTS] [db_name.]tb_name USING [db_name.]stb_name [(tag_name [, tag_name] ...)] TAGS (tag_value [, tag_value] ...) | [IF NOT EXISTS] [db_name.]tb_name USING [db_name.]stb_name [(tag_name [, tag_name] ...)] TAGS (tag_value [, tag_value] ...)
} }
create_definition: create_definition:
col_name column_definition col_name column_definition
column_definition: column_definition:
type_name [comment 'string_value'] type_name [comment 'string_value']
table_options: table_options:
table_option ... table_option ...
table_option: { table_option: {
COMMENT 'string_value' COMMENT 'string_value'
| WATERMARK duration[,duration] | WATERMARK duration[,duration]
...@@ -54,12 +54,13 @@ table_option: { ...@@ -54,12 +54,13 @@ table_option: {
需要注意的是转义字符中的内容必须是可打印字符。 需要注意的是转义字符中的内容必须是可打印字符。
**参数说明** **参数说明**
1. COMMENT:表注释。可用于超级表、子表和普通表。 1. COMMENT:表注释。可用于超级表、子表和普通表。
2. WATERMARK:指定窗口的关闭时间,默认值为 5 秒,最小单位毫秒,范围为0到15分钟,多个以逗号分隔。只可用于超级表,且只有当数据库使用了RETENTIONS参数时,才可以使用此表参数。 2. WATERMARK:指定窗口的关闭时间,默认值为 5 秒,最小单位毫秒,范围为 0 到 15 分钟,多个以逗号分隔。只可用于超级表,且只有当数据库使用了 RETENTIONS 参数时,才可以使用此表参数。
3. MAX_DELAY:用于控制推送计算结果的最大延迟,默认值为 interval 的值(但不能超过最大值),最小单位毫秒,范围为1毫秒到15分钟,多个以逗号分隔。注:不建议 MAX_DELAY 设置太小,否则会过于频繁的推送结果,影响存储和查询性能,如无特殊需求,取默认值即可。只可用于超级表,且只有当数据库使用了RETENTIONS参数时,才可以使用此表参数。 3. MAX_DELAY:用于控制推送计算结果的最大延迟,默认值为 interval 的值(但不能超过最大值),最小单位毫秒,范围为 1 毫秒到 15 分钟,多个以逗号分隔。注:不建议 MAX_DELAY 设置太小,否则会过于频繁的推送结果,影响存储和查询性能,如无特殊需求,取默认值即可。只可用于超级表,且只有当数据库使用了 RETENTIONS 参数时,才可以使用此表参数。
4. ROLLUP:Rollup 指定的聚合函数,提供基于多层级的降采样聚合结果。只可用于超级表。只有当数据库使用了RETENTIONS参数时,才可以使用此表参数。作用于超级表除TS列外的其它所有列,但是只能定义一个聚合函数。 聚合函数支持 avg, sum, min, max, last, first。 4. ROLLUP:Rollup 指定的聚合函数,提供基于多层级的降采样聚合结果。只可用于超级表。只有当数据库使用了 RETENTIONS 参数时,才可以使用此表参数。作用于超级表除 TS 列外的其它所有列,但是只能定义一个聚合函数。 聚合函数支持 avg, sum, min, max, last, first。
5. SMA:Small Materialized Aggregates,提供基于数据块的自定义预计算功能。预计算类型包括MAX、MIN和SUM。可用于超级表/普通表。 5. SMA:Small Materialized Aggregates,提供基于数据块的自定义预计算功能。预计算类型包括 MAX、MIN 和 SUM。可用于超级表/普通表。
6. TTL:Time to Live,是用户用来指定表的生命周期的参数。如果在持续的TTL时间内,都没有数据写入该表,则TDengine系统会自动删除该表。这个TTL的时间只是一个大概时间,我们系统不保证到了时间一定会将其删除,而只保证存在这样一个机制。TTL单位是天,默认为0,表示不限制。用户需要注意,TTL优先级高于KEEP,即TTL时间满足删除机制时,即使当前数据的存在时间小于KEEP,此表也会被删除。只可用于子表和普通表 6. TTL:Time to Live,是用户用来指定表的生命周期的参数。如果创建表时指定了这个参数,当该表的存在时间超过 TTL 指定的时间后,TDengine 自动删除该表。这个 TTL 的时间只是一个大概时间,系统不保证到了时间一定会将其删除,而只保证存在这样一个机制且最终一定会删除。TTL 单位是天,默认为 0,表示不限制,到期时间为表创建时间加上 TTL 时间
## 创建子表 ## 创建子表
...@@ -89,7 +90,7 @@ CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF ...@@ -89,7 +90,7 @@ CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF
```sql ```sql
ALTER TABLE [db_name.]tb_name alter_table_clause ALTER TABLE [db_name.]tb_name alter_table_clause
alter_table_clause: { alter_table_clause: {
alter_table_options alter_table_options
| ADD COLUMN col_name column_type | ADD COLUMN col_name column_type
...@@ -97,10 +98,10 @@ alter_table_clause: { ...@@ -97,10 +98,10 @@ alter_table_clause: {
| MODIFY COLUMN col_name column_type | MODIFY COLUMN col_name column_type
| RENAME COLUMN old_col_name new_col_name | RENAME COLUMN old_col_name new_col_name
} }
alter_table_options: alter_table_options:
alter_table_option ... alter_table_option ...
alter_table_option: { alter_table_option: {
TTL value TTL value
| COMMENT 'string_value' | COMMENT 'string_value'
...@@ -110,6 +111,7 @@ alter_table_option: { ...@@ -110,6 +111,7 @@ alter_table_option: {
**使用说明** **使用说明**
对普通表可以进行如下修改操作 对普通表可以进行如下修改操作
1. ADD COLUMN:添加列。 1. ADD COLUMN:添加列。
2. DROP COLUMN:删除列。 2. DROP COLUMN:删除列。
3. MODIFY COLUMN:修改列定义,如果数据列的类型是可变长类型,那么可以使用此指令修改其宽度,只能改大,不能改小。 3. MODIFY COLUMN:修改列定义,如果数据列的类型是可变长类型,那么可以使用此指令修改其宽度,只能改大,不能改小。
...@@ -143,15 +145,15 @@ ALTER TABLE tb_name RENAME COLUMN old_col_name new_col_name ...@@ -143,15 +145,15 @@ ALTER TABLE tb_name RENAME COLUMN old_col_name new_col_name
```sql ```sql
ALTER TABLE [db_name.]tb_name alter_table_clause ALTER TABLE [db_name.]tb_name alter_table_clause
alter_table_clause: { alter_table_clause: {
alter_table_options alter_table_options
| SET TAG tag_name = new_tag_value | SET TAG tag_name = new_tag_value
} }
alter_table_options: alter_table_options:
alter_table_option ... alter_table_option ...
alter_table_option: { alter_table_option: {
TTL value TTL value
| COMMENT 'string_value' | COMMENT 'string_value'
...@@ -159,6 +161,7 @@ alter_table_option: { ...@@ -159,6 +161,7 @@ alter_table_option: {
``` ```
**使用说明** **使用说明**
1. 对子表的列和标签的修改,除了更改标签值以外,都要通过超级表才能进行。 1. 对子表的列和标签的修改,除了更改标签值以外,都要通过超级表才能进行。
### 修改子表标签值 ### 修改子表标签值
...@@ -169,7 +172,7 @@ ALTER TABLE tb_name SET TAG tag_name=new_tag_value; ...@@ -169,7 +172,7 @@ ALTER TABLE tb_name SET TAG tag_name=new_tag_value;
## 删除表 ## 删除表
可以在一条SQL语句中删除一个或多个普通表或子表。 可以在一条 SQL 语句中删除一个或多个普通表或子表。
```sql ```sql
DROP TABLE [IF EXISTS] [db_name.]tb_name [, [IF EXISTS] [db_name.]tb_name] ... DROP TABLE [IF EXISTS] [db_name.]tb_name [, [IF EXISTS] [db_name.]tb_name] ...
...@@ -179,7 +182,7 @@ DROP TABLE [IF EXISTS] [db_name.]tb_name [, [IF EXISTS] [db_name.]tb_name] ... ...@@ -179,7 +182,7 @@ DROP TABLE [IF EXISTS] [db_name.]tb_name [, [IF EXISTS] [db_name.]tb_name] ...
### 显示所有表 ### 显示所有表
如下SQL语句可以列出当前数据库中的所有表名。 如下 SQL 语句可以列出当前数据库中的所有表名。
```sql ```sql
SHOW TABLES [LIKE tb_name_wildchar]; SHOW TABLES [LIKE tb_name_wildchar];
......
...@@ -1167,7 +1167,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W ...@@ -1167,7 +1167,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W
**参数范围** **参数范围**
- oper : "LT" (小于)、"GT"(大于)、"LE"(小于等于)、"GE"(大于等于)、"NE"(不等于)、"EQ"(等于),不区分大小写 - oper : `'LT'` (小于)、`'GT'`(大于)、`'LE'`(小于等于)、`'GE'`(大于等于)、`'NE'`(不等于)、`'EQ'`(等于),不区分大小写,但需要用`''`包括
- val : 数值型 - val : 数值型
- unit : 时间长度的单位,可取值时间单位: 1b(纳秒), 1u(微秒),1a(毫秒),1s(秒),1m(分),1h(小时),1d(天), 1w(周)。如果省略,默认为当前数据库精度。 - unit : 时间长度的单位,可取值时间单位: 1b(纳秒), 1u(微秒),1a(毫秒),1s(秒),1m(分),1h(小时),1d(天), 1w(周)。如果省略,默认为当前数据库精度。
......
...@@ -32,7 +32,7 @@ taos> show databases; ...@@ -32,7 +32,7 @@ taos> show databases;
Query OK, 2 rows in database (0.033802s) Query OK, 2 rows in database (0.033802s)
``` ```
因为运行在容器中的 TDengine 服务端使用容器的 hostname 建立连接,使用 taos shell 或者各种连接器(例如 JDBC-JNI)从容器外访问容器内的 TDengine 比较复杂,所以上述方式是访问容器中 TDengine 服务的最简单的方法,适用于一些简单场景。如果在一些复杂场景下想要从容器化使用 taos shell 或者各种连接器访问容器中的 TDengine 服务,请参考下一节。 因为运行在容器中的 TDengine 服务端使用容器的 hostname 建立连接,使用 TDengine CLI 或者各种连接器(例如 JDBC-JNI)从容器外访问容器内的 TDengine 比较复杂,所以上述方式是访问容器中 TDengine 服务的最简单的方法,适用于一些简单场景。如果在一些复杂场景下想要从容器化使用 TDengine CLI 或者各种连接器访问容器中的 TDengine 服务,请参考下一节。
## 在 host 网络上启动 TDengine ## 在 host 网络上启动 TDengine
...@@ -75,7 +75,7 @@ docker run -d \ ...@@ -75,7 +75,7 @@ docker run -d \
echo 127.0.0.1 tdengine |sudo tee -a /etc/hosts echo 127.0.0.1 tdengine |sudo tee -a /etc/hosts
``` ```
最后,可以从 taos shell 或者任意连接器以 "tdengine" 为服务端地址访问 TDengine 服务。 最后,可以从 TDengine CLI 或者任意连接器以 "tdengine" 为服务端地址访问 TDengine 服务。
```shell ```shell
taos -h tdengine -P 6030 taos -h tdengine -P 6030
...@@ -354,7 +354,7 @@ test-docker_td-2_1 /tini -- /usr/bin/entrypoi ... Up ...@@ -354,7 +354,7 @@ test-docker_td-2_1 /tini -- /usr/bin/entrypoi ... Up
test-docker_td-3_1 /tini -- /usr/bin/entrypoi ... Up test-docker_td-3_1 /tini -- /usr/bin/entrypoi ... Up
``` ```
4.taos shell 查看 dnodes 4.TDengine CLI 查看 dnodes
```shell ```shell
......
...@@ -7,7 +7,7 @@ description: 如何导出 TDengine 中的数据 ...@@ -7,7 +7,7 @@ description: 如何导出 TDengine 中的数据
## 按表导出 CSV 文件 ## 按表导出 CSV 文件
如果用户需要导出一个表或一个 STable 中的数据,可在 taos shell 中运行: 如果用户需要导出一个表或一个 STable 中的数据,可在 TDengine CLI 中运行:
```sql ```sql
select * from <tb_name> >> data.csv; select * from <tb_name> >> data.csv;
......
...@@ -116,7 +116,7 @@ charset UTF-8 ...@@ -116,7 +116,7 @@ charset UTF-8
### 9. 表名显示不全 ### 9. 表名显示不全
由于 taos shell 在终端中显示宽度有限,有可能比较长的表名显示不全,如果按照显示的不全的表名进行相关操作会发生 Table does not exist 错误。解决方法可以是通过修改 taos.cfg 文件中的设置项 maxBinaryDisplayWidth, 或者直接输入命令 set max_binary_display_width 100。或者在命令结尾使用 \G 参数来调整结果的显示方式。 由于 TDengine CLI 在终端中显示宽度有限,有可能比较长的表名显示不全,如果按照显示的不全的表名进行相关操作会发生 Table does not exist 错误。解决方法可以是通过修改 taos.cfg 文件中的设置项 maxBinaryDisplayWidth, 或者直接输入命令 set max_binary_display_width 100。或者在命令结尾使用 \G 参数来调整结果的显示方式。
### 10. 如何进行数据迁移? ### 10. 如何进行数据迁移?
......
...@@ -129,7 +129,7 @@ https://www.taosdata.com/cn/all-downloads/ ...@@ -129,7 +129,7 @@ https://www.taosdata.com/cn/all-downloads/
192.168.236.136 td01 192.168.236.136 td01
``` ```
配置完成后,在命令行内使用taos shell连接server端 配置完成后,在命令行内使用TDengine CLI连接server端
```shell ```shell
C:\TDengine>taos -h td01 C:\TDengine>taos -h td01
......
...@@ -35,7 +35,7 @@ Python 2.7.18 ...@@ -35,7 +35,7 @@ Python 2.7.18
下载地址:https://www.taosdata.com/cn/all-downloads/,选择一个合适的windows-client下载(client应该尽量与server端的版本保持一致) 下载地址:https://www.taosdata.com/cn/all-downloads/,选择一个合适的windows-client下载(client应该尽量与server端的版本保持一致)
使用client的taos shell连接server 使用client的TDengine CLI连接server
```shell ```shell
>taos -h node5 >taos -h node5
......
...@@ -256,8 +256,9 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentR ...@@ -256,8 +256,9 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentR
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf); typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -79,6 +79,7 @@ extern "C" { ...@@ -79,6 +79,7 @@ extern "C" {
#include <wchar.h> #include <wchar.h>
#include <wctype.h> #include <wctype.h>
#include "taoserror.h"
#include "osAtomic.h" #include "osAtomic.h"
#include "osDef.h" #include "osDef.h"
#include "osDir.h" #include "osDir.h"
......
def sync_source(branch_name) {
sh '''
hostname
env
echo ''' + branch_name + '''
'''
sh '''
cd ${TDINTERNAL_ROOT_DIR}
git reset --hard
git fetch || git fetch
git checkout ''' + branch_name + ''' -f
git branch
git pull || git pull
git log | head -n 20
cd ${TDENGINE_ROOT_DIR}
git reset --hard
git fetch || git fetch
git checkout ''' + branch_name + ''' -f
git branch
git pull || git pull
git log | head -n 20
git submodule update --init --recursive
'''
return 1
}
def run_test() {
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
'''
sh '''
export LD_LIBRARY_PATH=${TDINTERNAL_ROOT_DIR}/debug/build/lib
./fulltest.sh
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/tests
./test-all.sh b1fq
'''
}
def build_run() {
sync_source("${BRANCH_NAME}")
}
pipeline {
agent none
parameters {
string (
name:'version',
defaultValue:'3.0.0.1',
description: 'release version number,eg: 3.0.0.1 or 3.0.0.'
)
string (
name:'baseVersion',
defaultValue:'3.0.0.1',
description: 'This number of baseVerison is generally not modified.Now it is 3.0.0.1'
)
}
environment{
WORK_DIR = '/var/lib/jenkins/workspace'
TDINTERNAL_ROOT_DIR = '/var/lib/jenkins/workspace/TDinternal'
TDENGINE_ROOT_DIR = '/var/lib/jenkins/workspace/TDinternal/community'
BRANCH_NAME = '3.0'
TD_SERVER_TAR = "TDengine-server-${version}-Linux-x64.tar.gz"
BASE_TD_SERVER_TAR = "TDengine-server-${baseVersion}-arm64-x64.tar.gz"
TD_SERVER_ARM_TAR = "TDengine-server-${version}-Linux-arm64.tar.gz"
BASE_TD_SERVER_ARM_TAR = "TDengine-server-${baseVersion}-Linux-arm64.tar.gz"
TD_SERVER_LITE_TAR = "TDengine-server-${version}-Linux-x64-Lite.tar.gz"
BASE_TD_SERVER_LITE_TAR = "TDengine-server-${baseVersion}-Linux-x64-Lite.tar.gz"
TD_CLIENT_TAR = "TDengine-client-${version}-Linux-x64.tar.gz"
BASE_TD_CLIENT_TAR = "TDengine-client-${baseVersion}-arm64-x64.tar.gz"
TD_CLIENT_ARM_TAR = "TDengine-client-${version}-Linux-arm64.tar.gz"
BASE_TD_CLIENT_ARM_TAR = "TDengine-client-${baseVersion}-Linux-arm64.tar.gz"
TD_CLIENT_LITE_TAR = "TDengine-client-${version}-Linux-x64-Lite.tar.gz"
BASE_TD_CLIENT_LITE_TAR = "TDengine-client-${baseVersion}-Linux-x64-Lite.tar.gz"
TD_SERVER_RPM = "TDengine-server-${version}-Linux-x64.rpm"
TD_SERVER_DEB = "TDengine-server-${version}-Linux-x64.deb"
TD_SERVER_EXE = "TDengine-server-${version}-Windows-x64.exe"
TD_CLIENT_EXE = "TDengine-client-${version}-Windows-x64.exe"
}
stages {
stage ('RUN') {
stage('get check package scritps'){
agent{label 'ubuntu18'}
steps {
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
script{
sync_source("${BRANCH_NAME}")
}
}
}
}
parallel {
stage('ubuntu16') {
agent{label " ubuntu16 "}
steps {
timeout(time: 3, unit: 'MINUTES'){
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
rmtaos
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
}
}
}
stage('ubuntu18') {
agent{label " ubuntu18 "}
steps {
timeout(time: 3, unit: 'MINUTES'){
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
rmtaos
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_DEB} ${baseVersion} server
python3 checkPackageRuning.py
'''
}
}
}
stage('centos7') {
agent{label " centos7_9 "}
steps {
timeout(time: 240, unit: 'MINUTES'){
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
rmtaos
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
}
}
}
stage('centos8') {
agent{label " centos8_3 "}
steps {
timeout(time: 240, unit: 'MINUTES'){
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
rmtaos
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
}
}
}
}
}
}
}
\ No newline at end of file
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
# The interval of dnode reporting status to mnode # The interval of dnode reporting status to mnode
# statusInterval 1 # statusInterval 1
# The interval for taos shell to send heartbeat to mnode # The interval for TDengine CLI to send heartbeat to mnode
# shellActivityTimer 3 # shellActivityTimer 3
# The minimum sliding window time, milli-second # The minimum sliding window time, milli-second
......
#!/usr/bin/python
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# install pip
# pip install src/connector/python/
# -*- coding: utf-8 -*-
import sys , os
import getopt
import subprocess
# from this import d
import time
# install taospy
out = subprocess.getoutput("pip3 show taospy|grep Version| awk -F ':' '{print $2}' ")
print(out)
if (out == "" ):
os.system("pip install git+https://github.com/taosdata/taos-connector-python.git")
print("install taos python connector")
# start taosd prepare
os.system("rm -rf /var/lib/taos/*")
os.system("systemctl restart taosd ")
# wait a moment ,at least 5 seconds
time.sleep(5)
# prepare data by taosBenchmark
os.system("taosBenchmark -y -n 100 -t 100")
import taos
conn = taos.connect(host="localhost",
user="root",
password="taosdata",
database="test",
port=6030,
config="/etc/taos", # for windows the default value is C:\TDengine\cfg
timezone="Asia/Shanghai") # default your host's timezone
server_version = conn.server_info
print("server_version", server_version)
client_version = conn.client_info
print("client_version", client_version) # 3.0.0.0
# Execute a sql and get its result set. It's useful for SELECT statement
result: taos.TaosResult = conn.query("SELECT count(*) from test.meters")
data = result.fetch_all()
if data[0][0] !=10000:
print(" taosBenchmark work not as expected ")
sys.exit(1)
else:
print(" taosBenchmark work as expected ")
# test taosdump dump out data and dump in data
# dump out datas
os.system("taosdump --version")
os.system("mkdir -p /tmp/dumpdata")
os.system("rm -rf /tmp/dumpdata/*")
# dump data out
print("taosdump dump out data")
os.system("taosdump -o /tmp/dumpdata -D test -y ")
# drop database of test
print("drop database test")
os.system(" taos -s ' drop database test ;' ")
# dump data in
print("taosdump dump data in")
os.system("taosdump -i /tmp/dumpdata -y ")
result = conn.query("SELECT count(*) from test.meters")
data = result.fetch_all()
if data[0][0] !=10000:
print(" taosdump work not as expected ")
sys.exit(1)
else:
print(" taosdump work as expected ")
conn.close()
\ No newline at end of file
#!/bin/bash #!/bin/bash
if [ $1 -eq "abort-upgrade" ]; then if [ "$1"x = "abort-upgrade"x ]; then
exit 0 exit 0
fi fi
......
...@@ -47,7 +47,7 @@ taos> show databases; ...@@ -47,7 +47,7 @@ taos> show databases;
Query OK, 1 row(s) in set (0.002843s) Query OK, 1 row(s) in set (0.002843s)
``` ```
Since TDengine use container hostname to establish connections, it's a bit more complex to use taos shell and native connectors(such as JDBC-JNI) with TDengine container instance. This is the recommended way to expose ports and use TDengine with docker in simple cases. If you want to use taos shell or taosc/connectors smoothly outside the `tdengine` container, see next use cases that match you need. Since TDengine use container hostname to establish connections, it's a bit more complex to use TDengine CLI and native connectors(such as JDBC-JNI) with TDengine container instance. This is the recommended way to expose ports and use TDengine with docker in simple cases. If you want to use TDengine CLI or taosc/connectors smoothly outside the `tdengine` container, see next use cases that match you need.
### Start with host network ### Start with host network
...@@ -87,7 +87,7 @@ docker run -d \ ...@@ -87,7 +87,7 @@ docker run -d \
This command starts a docker container with TDengine server running and maps the container's TCP ports from 6030 to 6049 to the host's ports from 6030 to 6049 with TCP protocol and UDP ports range 6030-6039 to the host's UDP ports 6030-6039. If the host is already running TDengine server and occupying the same port(s), you need to map the container's port to a different unused port segment. (Please see TDengine 2.0 Port Description for details). In order to support TDengine clients accessing TDengine server services, both TCP and UDP ports need to be exposed by default(unless `rpcForceTcp` is set to `1`). This command starts a docker container with TDengine server running and maps the container's TCP ports from 6030 to 6049 to the host's ports from 6030 to 6049 with TCP protocol and UDP ports range 6030-6039 to the host's UDP ports 6030-6039. If the host is already running TDengine server and occupying the same port(s), you need to map the container's port to a different unused port segment. (Please see TDengine 2.0 Port Description for details). In order to support TDengine clients accessing TDengine server services, both TCP and UDP ports need to be exposed by default(unless `rpcForceTcp` is set to `1`).
If you want to use taos shell or native connectors([JDBC-JNI](https://www.taosdata.com/cn/documentation/connector/java), or [driver-go](https://github.com/taosdata/driver-go)), you need to make sure the `TAOS_FQDN` is resolvable at `/etc/hosts` or with custom DNS service. If you want to use TDengine CLI or native connectors([JDBC-JNI](https://www.taosdata.com/cn/documentation/connector/java), or [driver-go](https://github.com/taosdata/driver-go)), you need to make sure the `TAOS_FQDN` is resolvable at `/etc/hosts` or with custom DNS service.
If you set the `TAOS_FQDN` to host's hostname, it will works as using `hosts` network like previous use case. Otherwise, like in `-e TAOS_FQDN=tdengine`, you can add the hostname record `tdengine` into `/etc/hosts` (use `127.0.0.1` here in host path, if use TDengine client/application in other hosts, you should set the right ip to the host eg. `192.168.10.1`(check the real ip in host with `hostname -i` or `ip route list default`) to make the TDengine endpoint resolvable): If you set the `TAOS_FQDN` to host's hostname, it will works as using `hosts` network like previous use case. Otherwise, like in `-e TAOS_FQDN=tdengine`, you can add the hostname record `tdengine` into `/etc/hosts` (use `127.0.0.1` here in host path, if use TDengine client/application in other hosts, you should set the right ip to the host eg. `192.168.10.1`(check the real ip in host with `hostname -i` or `ip route list default`) to make the TDengine endpoint resolvable):
...@@ -391,7 +391,7 @@ test_td-1_1 /usr/bin/entrypoint.sh taosd Up 6030/tcp, 6031/tcp, ...@@ -391,7 +391,7 @@ test_td-1_1 /usr/bin/entrypoint.sh taosd Up 6030/tcp, 6031/tcp,
test_td-2_1 /usr/bin/entrypoint.sh taosd Up 6030/tcp, 6031/tcp, 6032/tcp, 6033/tcp, 6034/tcp, 6035/tcp, 6036/tcp, 6037/tcp, 6038/tcp, 6039/tcp, 6040/tcp, 6041/tcp, 6042/tcp test_td-2_1 /usr/bin/entrypoint.sh taosd Up 6030/tcp, 6031/tcp, 6032/tcp, 6033/tcp, 6034/tcp, 6035/tcp, 6036/tcp, 6037/tcp, 6038/tcp, 6039/tcp, 6040/tcp, 6041/tcp, 6042/tcp
``` ```
Check dnodes with taos shell: Check dnodes with TDengine CLI:
```bash ```bash
$ docker-compose exec td-1 taos -s "show dnodes" $ docker-compose exec td-1 taos -s "show dnodes"
......
#!/bin/sh
# function installPkgAndCheckFile{
echo "Download package"
packgeName=$1
version=$2
originPackageName=$3
originversion=$4
testFile=$5
subFile="taos.tar.gz"
if [ ${testFile} = "server" ];then
tdPath="TDengine-server-${version}"
originTdpPath="TDengine-server-${originversion}"
installCmd="install.sh"
elif [ ${testFile} = "client" ];then
tdPath="TDengine-client-${version}"
originTdpPath="TDengine-client-${originversion}"
installCmd="install_client.sh"
elif [ ${testFile} = "tools" ];then
tdPath="taosTools-${version}"
originTdpPath="taosTools-${originversion}"
installCmd="install-taostools.sh"
fi
echo "Uninstall all components of TDeingne"
if command -v rmtaos ;then
echo "uninstall all components of TDeingne:rmtaos"
echo " "
else
echo "os doesn't include TDengine "
fi
if command -v rmtaostools ;then
echo "uninstall all components of TDeingne:rmtaostools"
echo " "
else
echo "os doesn't include rmtaostools "
fi
echo "new workroom path"
installPath="/usr/local/src/packageTest"
oriInstallPath="/usr/local/src/packageTest/3.1"
if [ ! -d ${installPath} ] ;then
mkdir -p ${installPath}
else
echo "${installPath} already exists"
fi
if [ ! -d ${oriInstallPath} ] ;then
mkdir -p ${oriInstallPath}
else
echo "${oriInstallPath} already exists"
fi
echo "decompress installPackage"
cd ${installPath}
wget https://www.taosdata.com/assets-download/3.0/${packgeName}
cd ${oriInstallPath}
wget https://www.taosdata.com/assets-download/3.0/${originPackageName}
if [[ ${packgeName} =~ "deb" ]];then
echo "dpkg ${packgeName}" && dpkg -i ${packgeName}
elif [[ ${packgeName} =~ "rpm" ]];then
echo "rpm ${packgeName}" && rpm -ivh ${packgeName}
elif [[ ${packgeName} =~ "tar" ]];then
echo "tar ${packgeName}" && tar -xvf ${packgeName}
cd ${oriInstallPath}
echo "tar -xvf ${originPackageName}" && tar -xvf ${originPackageName}
cd ${installPath}
echo "tar -xvf ${packgeName}" && tar -xvf ${packgeName}
if [ ${testFile} != "tools" ] ;then
cd ${installPath}/${tdPath} && tar vxf ${subFile}
cd ${oriInstallPath}/${originTdpPath} && tar vxf ${subFile}
fi
echo "check installPackage File"
cd ${installPath}
tree ${oriInstallPath}/${originTdpPath} > ${originPackageName}_checkfile
tree ${installPath}/${tdPath} > ${packgeName}_checkfile
diff ${packgeName}_checkfile ${originPackageName}_checkfile > ${installPath}/diffFile.log
diffNumbers=`cat ${installPath}/diffFile.log |wc -l `
if [ ${diffNumbers} != 0 ];then
echo "The number and names of files have changed from the previous installation package"
echo `cat ${installPath}/diffFile.log`
exit -1
fi
cd ${installPath}/${tdPath}
if [ ${testFile} = "server" ];then
bash ${installCmd} -e no
else
bash ${installCmd}
fi
fi
# }
# installPkgAndCheckFile
...@@ -381,8 +381,7 @@ function install_header() { ...@@ -381,8 +381,7 @@ function install_header() {
${install_main_dir}/include || ${install_main_dir}/include ||
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \ ${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
${install_main_2_dir}/include && ${install_main_2_dir}/include &&
${csudo}chmod 644 ${install_main_dir}/include/* ||: ${csudo}chmod 644 ${install_main_dir}/include/* || ${csudo}chmod 644 ${install_main_2_dir}/include/*
${csudo}chmod 644 ${install_main_2_dir}/include/*
fi fi
} }
......
...@@ -1706,8 +1706,8 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -1706,8 +1706,8 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
} }
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) { void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) {
SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock)); SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*));
taosArrayPush(dataBlocks, pBlock); taosArrayPush(dataBlocks, &pBlock);
blockDebugShowDataBlocks(dataBlocks, flag); blockDebugShowDataBlocks(dataBlocks, flag);
taosArrayDestroy(dataBlocks); taosArrayDestroy(dataBlocks);
} }
......
...@@ -476,7 +476,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { ...@@ -476,7 +476,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
if (code) goto _err; if (code) goto _err;
if (!state->aBlockL) { if (!state->aBlockL) {
state->aBlockL = taosArrayInit(0, sizeof(SBlockIdx)); state->aBlockL = taosArrayInit(0, sizeof(SBlockL));
} else { } else {
taosArrayClear(state->aBlockL); taosArrayClear(state->aBlockL);
} }
......
...@@ -33,11 +33,16 @@ typedef struct SPullWindowInfo { ...@@ -33,11 +33,16 @@ typedef struct SPullWindowInfo {
uint64_t groupId; uint64_t groupId;
} SPullWindowInfo; } SPullWindowInfo;
typedef struct SOpenWindowInfo {
SResultRowPosition pos;
uint64_t groupId;
} SOpenWindowInfo;
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult); static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
///* ///*
...@@ -598,14 +603,14 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -598,14 +603,14 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
int32_t startPos = 0; int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs; int32_t numOfOutput = pSup->numOfExprs;
uint64_t groupId = pBlock->info.groupId;
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
while (1) { while (1) {
SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); SListNode* pn = tdListGetHead(pResultRowInfo->openWindow);
SOpenWindowInfo* pOpenWin = (SOpenWindowInfo *)pn->data;
SResultRowPosition* p1 = (SResultRowPosition*)pn->data; uint64_t groupId = pOpenWin->groupId;
SResultRowPosition* p1 = &pOpenWin->pos;
if (p->pageId == p1->pageId && p->offset == p1->offset) { if (p->pageId == p1->pageId && p->offset == p1->offset) {
break; break;
} }
...@@ -631,12 +636,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -631,12 +636,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0); SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
int64_t prevTs = *(int64_t*)pTsKey->pData; int64_t prevTs = *(int64_t*)pTsKey->pData;
doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey, if (groupId == pBlock->info.groupId) {
RESULT_ROW_END_INTERP, pSup); doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
RESULT_ROW_END_INTERP, pSup);
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows,
numOfExprs); numOfExprs);
...@@ -965,7 +973,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -965,7 +973,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// prev time window not interpolation yet. // prev time window not interpolation yet.
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window // restore current time window
...@@ -1017,10 +1025,18 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1017,10 +1025,18 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows = forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
//TODO: add to open window? how to close the open windows after input blocks exhausted?
#if 0
if ((ascScan && ekey <= pBlock->info.window.ekey) ||
(!ascScan && ekey >= pBlock->info.window.skey)) {
// window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
} else if (pInfo->timeWindowInterpo) {
addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
}
#endif
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
numOfOutput); numOfOutput);
...@@ -1040,20 +1056,23 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf ...@@ -1040,20 +1056,23 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf
} }
} }
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult) { SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) {
SResultRowPosition pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; SOpenWindowInfo openWin = {0};
openWin.pos.pageId = pResult->pageId;
openWin.pos.offset = pResult->offset;
openWin.groupId = groupId;
SListNode* pn = tdListGetTail(pResultRowInfo->openWindow); SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
if (pn == NULL) { if (pn == NULL) {
tdListAppend(pResultRowInfo->openWindow, &pos); tdListAppend(pResultRowInfo->openWindow, &openWin);
return pos; return openWin.pos;
} }
SResultRowPosition* px = (SResultRowPosition*)pn->data; SOpenWindowInfo * px = (SOpenWindowInfo *)pn->data;
if (px->pageId != pos.pageId || px->offset != pos.offset) { if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
tdListAppend(pResultRowInfo->openWindow, &pos); tdListAppend(pResultRowInfo->openWindow, &openWin);
} }
return pos; return openWin.pos;
} }
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) { int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
...@@ -1887,7 +1906,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1887,7 +1906,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo); pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo);
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pInfo->binfo.resultRowInfo.openWindow == NULL) { if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error; goto _error;
} }
...@@ -5160,7 +5179,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -5160,7 +5179,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo); iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo);
if (iaInfo->timeWindowInterpo) { if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
} }
initResultRowInfo(&iaInfo->binfo.resultRowInfo); initResultRowInfo(&iaInfo->binfo.resultRowInfo);
...@@ -5295,7 +5314,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -5295,7 +5314,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
// prev time window not interpolation yet. // prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) { if (iaInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window // restore current time window
...@@ -5470,9 +5489,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -5470,9 +5489,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initBasicInfo(&pIntervalInfo->binfo, pResBlock); initBasicInfo(&pIntervalInfo->binfo, pResBlock);
initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo); pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
if (pIntervalInfo->timeWindowInterpo) { if (pIntervalInfo->timeWindowInterpo) {
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) { if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error; goto _error;
} }
......
...@@ -303,7 +303,7 @@ static int32_t translateInOutStr(SFunctionNode* pFunc, char* pErrBuf, int32_t le ...@@ -303,7 +303,7 @@ static int32_t translateInOutStr(SFunctionNode* pFunc, char* pErrBuf, int32_t le
} }
SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (!IS_VAR_DATA_TYPE(pPara1->resType.type)) { if (!IS_STR_DATA_TYPE(pPara1->resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -317,7 +317,7 @@ static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len ...@@ -317,7 +317,7 @@ static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len
} }
SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (!IS_VAR_DATA_TYPE(pPara1->resType.type)) { if (!IS_STR_DATA_TYPE(pPara1->resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -546,7 +546,7 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t ...@@ -546,7 +546,7 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
// param2 // param2
if (3 == numOfParams) { if (3 == numOfParams) {
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type; uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type;
if (!IS_VAR_DATA_TYPE(para3Type)) { if (!IS_STR_DATA_TYPE(para3Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -593,7 +593,7 @@ static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int ...@@ -593,7 +593,7 @@ static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int
// param2 // param2
if (3 == numOfParams) { if (3 == numOfParams) {
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type; uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type;
if (!IS_VAR_DATA_TYPE(para3Type)) { if (!IS_STR_DATA_TYPE(para3Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -1388,7 +1388,7 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len) ...@@ -1388,7 +1388,7 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
} }
// set result type // set result type
if (IS_VAR_DATA_TYPE(colType)) { if (IS_STR_DATA_TYPE(colType)) {
pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType}; pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType};
} else { } else {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType};
...@@ -1431,7 +1431,7 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { ...@@ -1431,7 +1431,7 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
} }
// set result type // set result type
if (IS_VAR_DATA_TYPE(colType)) { if (IS_STR_DATA_TYPE(colType)) {
pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType}; pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType};
} else { } else {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType};
...@@ -1514,7 +1514,7 @@ static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) ...@@ -1514,7 +1514,7 @@ static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
for (int32_t i = 1; i < 3; ++i) { for (int32_t i = 1; i < 3; ++i) {
nodeType = nodeType(nodesListGetNode(pFunc->pParameterList, i)); nodeType = nodeType(nodesListGetNode(pFunc->pParameterList, i));
paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
if (!IS_VAR_DATA_TYPE(paraType) || QUERY_NODE_VALUE != nodeType) { if (!IS_STR_DATA_TYPE(paraType) || QUERY_NODE_VALUE != nodeType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -1682,7 +1682,7 @@ static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) ...@@ -1682,7 +1682,7 @@ static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
} }
if (!IS_VAR_DATA_TYPE(((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type)) { if (!IS_STR_DATA_TYPE(((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -1714,7 +1714,7 @@ static int32_t translateConcatImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t ...@@ -1714,7 +1714,7 @@ static int32_t translateConcatImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t
for (int32_t i = 0; i < numOfParams; ++i) { for (int32_t i = 0; i < numOfParams; ++i) {
SNode* pPara = nodesListGetNode(pFunc->pParameterList, i); SNode* pPara = nodesListGetNode(pFunc->pParameterList, i);
uint8_t paraType = ((SExprNode*)pPara)->resType.type; uint8_t paraType = ((SExprNode*)pPara)->resType.type;
if (!IS_VAR_DATA_TYPE(paraType) && !IS_NULL_TYPE(paraType)) { if (!IS_STR_DATA_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
if (TSDB_DATA_TYPE_NCHAR == paraType) { if (TSDB_DATA_TYPE_NCHAR == paraType) {
...@@ -1770,7 +1770,7 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len) ...@@ -1770,7 +1770,7 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
uint8_t para0Type = pPara0->resType.type; uint8_t para0Type = pPara0->resType.type;
uint8_t para1Type = pPara1->resType.type; uint8_t para1Type = pPara1->resType.type;
if (!IS_VAR_DATA_TYPE(para0Type) || !IS_INTEGER_TYPE(para1Type)) { if (!IS_STR_DATA_TYPE(para0Type) || !IS_INTEGER_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -1802,7 +1802,7 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { ...@@ -1802,7 +1802,7 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
uint8_t para2Type = pFunc->node.resType.type; uint8_t para2Type = pFunc->node.resType.type;
int32_t para2Bytes = pFunc->node.resType.bytes; int32_t para2Bytes = pFunc->node.resType.bytes;
if (IS_VAR_DATA_TYPE(para2Type)) { if (IS_STR_DATA_TYPE(para2Type)) {
para2Bytes -= VARSTR_HEADER_SIZE; para2Bytes -= VARSTR_HEADER_SIZE;
} }
if (para2Bytes <= 0 || para2Bytes > 4096) { // cast dst var type length limits to 4096 bytes if (para2Bytes <= 0 || para2Bytes > 4096) { // cast dst var type length limits to 4096 bytes
...@@ -1859,7 +1859,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int ...@@ -1859,7 +1859,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
} }
if (!IS_VAR_DATA_TYPE(((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type)) { if (!IS_STR_DATA_TYPE(((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -1878,7 +1878,7 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_ ...@@ -1878,7 +1878,7 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if ((!IS_VAR_DATA_TYPE(para1Type) && !IS_INTEGER_TYPE(para1Type) && TSDB_DATA_TYPE_TIMESTAMP != para1Type) || if ((!IS_STR_DATA_TYPE(para1Type) && !IS_INTEGER_TYPE(para1Type) && TSDB_DATA_TYPE_TIMESTAMP != para1Type) ||
!IS_INTEGER_TYPE(para2Type)) { !IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
...@@ -1911,7 +1911,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le ...@@ -1911,7 +1911,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
for (int32_t i = 0; i < 2; ++i) { for (int32_t i = 0; i < 2; ++i) {
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
if (!IS_VAR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && TSDB_DATA_TYPE_TIMESTAMP != paraType) { if (!IS_STR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && TSDB_DATA_TYPE_TIMESTAMP != paraType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
} }
......
...@@ -84,6 +84,7 @@ typedef struct SUdf { ...@@ -84,6 +84,7 @@ typedef struct SUdf {
TUdfAggStartFunc aggStartFunc; TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc; TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc; TUdfAggFinishFunc aggFinishFunc;
TUdfAggMergeFunc aggMergeFunc;
TUdfInitFunc initFunc; TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc; TUdfDestroyFunc destroyFunc;
...@@ -271,6 +272,15 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -271,6 +272,15 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break; break;
} }
case TSDB_UDF_CALL_AGG_MERGE: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf);
freeUdfInterBuf(&call->interBuf);
freeUdfInterBuf(&call->interBuf2);
subRsp->resultBuf = outBuf;
break;
}
case TSDB_UDF_CALL_AGG_FIN: { case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggFinishFunc(&call->interBuf, &outBuf); code = udf->aggFinishFunc(&call->interBuf, &outBuf);
...@@ -309,6 +319,10 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -309,6 +319,10 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
freeUdfInterBuf(&subRsp->resultBuf); freeUdfInterBuf(&subRsp->resultBuf);
break; break;
} }
case TSDB_UDF_CALL_AGG_MERGE: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_FIN: { case TSDB_UDF_CALL_AGG_FIN: {
freeUdfInterBuf(&subRsp->resultBuf); freeUdfInterBuf(&subRsp->resultBuf);
break; break;
...@@ -560,7 +574,11 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { ...@@ -560,7 +574,11 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strncpy(finishFuncName, processFuncName, strlen(processFuncName)); strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
// TODO: merge char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc));
} }
return 0; return 0;
} }
......
...@@ -45,6 +45,8 @@ typedef struct SScalarCtx { ...@@ -45,6 +45,8 @@ typedef struct SScalarCtx {
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList) #define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
//#define SCL_IS_NULL_VALUE_NODE(_node) ((QUERY_NODE_VALUE == nodeType(_node)) && (TSDB_DATA_TYPE_NULL == ((SValueNode *)_node)->node.resType.type) && (((SValueNode *)_node)->placeholderNo <= 0)) //#define SCL_IS_NULL_VALUE_NODE(_node) ((QUERY_NODE_VALUE == nodeType(_node)) && (TSDB_DATA_TYPE_NULL == ((SValueNode *)_node)->node.resType.type) && (((SValueNode *)_node)->placeholderNo <= 0))
#define SCL_IS_NULL_VALUE_NODE(_node) ((QUERY_NODE_VALUE == nodeType(_node)) && (TSDB_DATA_TYPE_NULL == ((SValueNode *)_node)->node.resType.type)) #define SCL_IS_NULL_VALUE_NODE(_node) ((QUERY_NODE_VALUE == nodeType(_node)) && (TSDB_DATA_TYPE_NULL == ((SValueNode *)_node)->node.resType.type))
#define SCL_IS_COMPARISON_OPERATOR(_opType) ((_opType) >= OP_TYPE_GREATER_THAN && (_opType) < OP_TYPE_IS_NOT_UNKNOWN)
#define SCL_DOWNGRADE_DATETYPE(_type) ((_type) == TSDB_DATA_TYPE_BIGINT || TSDB_DATA_TYPE_DOUBLE == (_type) || (_type) == TSDB_DATA_TYPE_UBIGINT)
#define sclFatal(...) qFatal(__VA_ARGS__) #define sclFatal(...) qFatal(__VA_ARGS__)
#define sclError(...) qError(__VA_ARGS__) #define sclError(...) qError(__VA_ARGS__)
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "scalar.h" #include "scalar.h"
#include "tudf.h" #include "tudf.h"
#include "ttime.h" #include "ttime.h"
#include "tcompare.h"
int32_t scalarGetOperatorParamNum(EOperatorType type) { int32_t scalarGetOperatorParamNum(EOperatorType type) {
if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type
...@@ -219,6 +220,82 @@ void sclFreeParamList(SScalarParam *param, int32_t paramNum) { ...@@ -219,6 +220,82 @@ void sclFreeParamList(SScalarParam *param, int32_t paramNum) {
taosMemoryFree(param); taosMemoryFree(param);
} }
void sclDowngradeValueType(SValueNode *valueNode) {
switch (valueNode->node.resType.type) {
case TSDB_DATA_TYPE_BIGINT: {
int8_t i8 = valueNode->datum.i;
if (i8 == valueNode->datum.i) {
valueNode->node.resType.type = TSDB_DATA_TYPE_TINYINT;
*(int8_t*)&valueNode->typeData = i8;
break;
}
int16_t i16 = valueNode->datum.i;
if (i16 == valueNode->datum.i) {
valueNode->node.resType.type = TSDB_DATA_TYPE_SMALLINT;
*(int16_t*)&valueNode->typeData = i16;
break;
}
int32_t i32 = valueNode->datum.i;
if (i32 == valueNode->datum.i) {
valueNode->node.resType.type = TSDB_DATA_TYPE_INT;
*(int32_t*)&valueNode->typeData = i32;
break;
}
break;
}
case TSDB_DATA_TYPE_UBIGINT:{
uint8_t u8 = valueNode->datum.i;
if (u8 == valueNode->datum.i) {
int8_t i8 = valueNode->datum.i;
if (i8 == valueNode->datum.i) {
valueNode->node.resType.type = TSDB_DATA_TYPE_TINYINT;
*(int8_t*)&valueNode->typeData = i8;
} else {
valueNode->node.resType.type = TSDB_DATA_TYPE_UTINYINT;
*(uint8_t*)&valueNode->typeData = u8;
}
break;
}
uint16_t u16 = valueNode->datum.i;
if (u16 == valueNode->datum.i) {
int16_t i16 = valueNode->datum.i;
if (i16 == valueNode->datum.i) {
valueNode->node.resType.type = TSDB_DATA_TYPE_SMALLINT;
*(int16_t*)&valueNode->typeData = i16;
} else {
valueNode->node.resType.type = TSDB_DATA_TYPE_USMALLINT;
*(uint16_t*)&valueNode->typeData = u16;
}
break;
}
uint32_t u32 = valueNode->datum.i;
if (u32 == valueNode->datum.i) {
int32_t i32 = valueNode->datum.i;
if (i32 == valueNode->datum.i) {
valueNode->node.resType.type = TSDB_DATA_TYPE_INT;
*(int32_t*)&valueNode->typeData = i32;
} else {
valueNode->node.resType.type = TSDB_DATA_TYPE_UINT;
*(uint32_t*)&valueNode->typeData = u32;
}
break;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
float f = valueNode->datum.d;
if (FLT_EQUAL(f, valueNode->datum.d)) {
valueNode->node.resType.type = TSDB_DATA_TYPE_FLOAT;
*(float*)&valueNode->typeData = f;
break;
}
break;
}
default:
break;
}
}
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) { int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
switch (nodeType(node)) { switch (nodeType(node)) {
case QUERY_NODE_LEFT_VALUE: { case QUERY_NODE_LEFT_VALUE: {
...@@ -675,6 +752,10 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { ...@@ -675,6 +752,10 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
} }
if (SCL_IS_COMPARISON_OPERATOR(node->opType) && SCL_DOWNGRADE_DATETYPE(valueNode->node.resType.type)) {
sclDowngradeValueType(valueNode);
}
} }
if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) { if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
...@@ -692,6 +773,10 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { ...@@ -692,6 +773,10 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
} }
if (SCL_IS_COMPARISON_OPERATOR(node->opType) && SCL_DOWNGRADE_DATETYPE(valueNode->node.resType.type)) {
sclDowngradeValueType(valueNode);
}
} }
if (node->pRight && (QUERY_NODE_NODE_LIST == nodeType(node->pRight))) { if (node->pRight && (QUERY_NODE_NODE_LIST == nodeType(node->pRight))) {
......
...@@ -133,6 +133,7 @@ int32_t taosMulMkDir(const char *dirname) { ...@@ -133,6 +133,7 @@ int32_t taosMulMkDir(const char *dirname) {
code = mkdir(temp, 0755); code = mkdir(temp, 0755);
#endif #endif
if (code < 0 && errno != EEXIST) { if (code < 0 && errno != EEXIST) {
terrno = TAOS_SYSTEM_ERROR(errno);
return code; return code;
} }
*pos = TD_DIRSEP[0]; *pos = TD_DIRSEP[0];
...@@ -146,6 +147,7 @@ int32_t taosMulMkDir(const char *dirname) { ...@@ -146,6 +147,7 @@ int32_t taosMulMkDir(const char *dirname) {
code = mkdir(temp, 0755); code = mkdir(temp, 0755);
#endif #endif
if (code < 0 && errno != EEXIST) { if (code < 0 && errno != EEXIST) {
terrno = TAOS_SYSTEM_ERROR(errno);
return code; return code;
} }
} }
......
...@@ -313,6 +313,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { ...@@ -313,6 +313,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
assert(!(tdFileOptions & TD_FILE_EXCL)); assert(!(tdFileOptions & TD_FILE_EXCL));
fp = fopen(path, mode); fp = fopen(path, mode);
if (fp == NULL) { if (fp == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
} else { } else {
...@@ -335,6 +336,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { ...@@ -335,6 +336,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
fd = open(path, access, S_IRWXU | S_IRWXG | S_IRWXO); fd = open(path, access, S_IRWXU | S_IRWXG | S_IRWXO);
#endif #endif
if (fd == -1) { if (fd == -1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
} }
......
...@@ -595,6 +595,7 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { ...@@ -595,6 +595,7 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
#else #else
struct statvfs info; struct statvfs info;
if (statvfs(dataDir, &info)) { if (statvfs(dataDir, &info)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} else { } else {
diskSize->total = info.f_blocks * info.f_frsize; diskSize->total = info.f_blocks * info.f_frsize;
......
...@@ -429,7 +429,7 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) { ...@@ -429,7 +429,7 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
} }
static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) { static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) {
if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL) { if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceAvailable()) {
taosUpdateLogNums(level); taosUpdateLogNums(level);
if (tsAsyncLog) { if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len); taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
...@@ -451,7 +451,6 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b ...@@ -451,7 +451,6 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b
} }
void taosPrintLog(const char *flags, ELogLevel level, int32_t dflag, const char *format, ...) { void taosPrintLog(const char *flags, ELogLevel level, int32_t dflag, const char *format, ...) {
if (!osLogSpaceAvailable()) return;
if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return; if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return;
char buffer[LOG_MAX_LINE_BUFFER_SIZE]; char buffer[LOG_MAX_LINE_BUFFER_SIZE];
......
#!/bin/bash
pgrep taosd || taosd >> /dev/null 2>&1 &
pgrep taosadapter || taosadapter >> /dev/null 2>&1 &
cd ../../docs/examples/java
mvn clean test > jdbc-out.log 2>&1
tail -n 20 jdbc-out.log
cases=`grep 'Tests run' jdbc-out.log | awk 'END{print $3}'`
totalJDBCCases=`echo ${cases/%,}`
failed=`grep 'Tests run' jdbc-out.log | awk 'END{print $5}'`
JDBCFailed=`echo ${failed/%,}`
error=`grep 'Tests run' jdbc-out.log | awk 'END{print $7}'`
JDBCError=`echo ${error/%,}`
totalJDBCFailed=`expr $JDBCFailed + $JDBCError`
totalJDBCSuccess=`expr $totalJDBCCases - $totalJDBCFailed`
if [ "$totalJDBCSuccess" -gt "0" ]; then
echo -e "\n${GREEN} ### Total $totalJDBCSuccess JDBC case(s) succeed! ### ${NC}"
fi
if [ "$totalJDBCFailed" -ne "0" ]; then
echo -e "\n${RED} ### Total $totalJDBCFailed JDBC case(s) failed! ### ${NC}"
exit 8
fi
\ No newline at end of file
...@@ -76,7 +76,7 @@ sql insert into ct4 values ( '2022-05-21 01:01:01.000', NULL, NULL, NULL, NULL, ...@@ -76,7 +76,7 @@ sql insert into ct4 values ( '2022-05-21 01:01:01.000', NULL, NULL, NULL, NULL,
print ================ start query ====================== print ================ start query ======================
print ================ SQL used to cause taosd or taos shell crash print ================ SQL used to cause taosd or TDengine CLI crash
sql_error select sum(c1) ,count(c1) from ct4 group by c1 having sum(c10) between 0 and 1 ; sql_error select sum(c1) ,count(c1) from ct4 group by c1 having sum(c10) between 0 and 1 ;
#system sh/exec.sh -n dnode1 -s stop -x SIGINT #system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册