diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java index 7e96397b04442c1d39ccdc32cc2e242ca6b15445..718d875893ce64c8857ab904b01694570db9246a 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java @@ -58,7 +58,7 @@ class DataBaseMonitor { public class FastWriteExample { final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); - final static int taskQueueCapacity = Integer.MAX_VALUE / 100; + final static int taskQueueCapacity = 10000000; final static List> taskQueues = new ArrayList<>(); final static List readTasks = new ArrayList<>(); final static List writeTasks = new ArrayList<>(); @@ -73,8 +73,12 @@ public class FastWriteExample { public static void main(String[] args) throws InterruptedException, SQLException { int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1; - int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 1; - logger.info("readTaskCount={}, writeTaskCount={}", readTaskCount, writeTaskCount); + int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 3; + int tableCount = args.length > 2 ? Integer.parseInt(args[2]) : 1000; + int maxBatchSize = args.length > 3 ? Integer.parseInt(args[3]) : 3000; + + logger.info("readTaskCount={}, writeTaskCount={} tableCount={} maxBatchSize={}", + readTaskCount, writeTaskCount, tableCount, maxBatchSize); databaseMonitor.init().prepareDatabase(); @@ -82,15 +86,16 @@ public class FastWriteExample { for (int i = 0; i < writeTaskCount; ++i) { BlockingQueue queue = new ArrayBlockingQueue<>(taskQueueCapacity); taskQueues.add(queue); - WriteTask task = new WriteTask(queue); + WriteTask task = new WriteTask(queue, maxBatchSize); Thread t = new Thread(task); t.setName("WriteThread-" + i); t.start(); } // create reading tasks and start reading threads + int tableCountPerTask = tableCount / readTaskCount; for (int i = 0; i < readTaskCount; ++i) { - ReadTask task = new ReadTask(i, taskQueues); + ReadTask task = new ReadTask(i, taskQueues, tableCountPerTask); Thread t = new Thread(task); t.setName("ReadThread-" + i); t.start(); @@ -102,7 +107,7 @@ public class FastWriteExample { while (true) { Thread.sleep(10000); long count = databaseMonitor.count(); - logger.info("total_count={} speed={}", count, (count - lastCount) / 10); + logger.info("count={} speed={}", count, (count - lastCount) / 10); lastCount = count; } } diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java index 0e94f876fc48b50a31b3d89c3e0f5377136e8cd2..94cde899f49b3d517e320c06d355b13fca95c2f5 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java @@ -12,7 +12,7 @@ import java.util.concurrent.BlockingQueue; */ class MockDataSource implements Iterator { private String tbNamePrefix; - private int tableCount = 1000; + private int tableCount; private long maxRowsPerTable = 1000000000L; // 100 milliseconds between two neighbouring rows. @@ -26,8 +26,9 @@ class MockDataSource implements Iterator { int[] voltage = {119, 116, 111, 113, 118}; float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f}; - public MockDataSource(String tbNamePrefix) { + public MockDataSource(String tbNamePrefix, int tableCount) { this.tbNamePrefix = tbNamePrefix; + this.tableCount = tableCount; } @Override @@ -62,12 +63,14 @@ class ReadTask implements Runnable { private final int taskId; private final List> taskQueues; private final int queueCount; + private final int tableCount; private boolean active = true; - public ReadTask(int readTaskId, List> queues) { + public ReadTask(int readTaskId, List> queues, int tableCount) { this.taskId = readTaskId; this.taskQueues = queues; this.queueCount = queues.size(); + this.tableCount = tableCount; } /** @@ -87,7 +90,7 @@ class ReadTask implements Runnable { @Override public void run() { logger.info("started"); - Iterator it = new MockDataSource("tb" + this.taskId); + Iterator it = new MockDataSource("tb" + this.taskId, tableCount); try { while (it.hasNext() && active) { String line = it.next(); diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java index 83705bbe5f84d2b10060f6102fbb92c572c2dc06..21b4fc88917a08c4cd4d413f2ccde2e97cc98893 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java @@ -17,7 +17,7 @@ import java.util.Map; *

* The main interfaces are two methods: *

    - *
  1. {@link SQLWriter#processLine}, which receive raw data from WriteTask and group raw data by table names.
  2. + *
  3. {@link SQLWriter#processLine}, which receive raw lines from WriteTask and group them by table names.
  4. *
  5. {@link SQLWriter#flush}, which assemble INSERT statement and execute it.
  6. *
*

diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java index 1118bb7e9e882c6d3524522b95e1a0f120f3db33..bbe9cb07704519bf43c6187844c296e227315505 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java @@ -8,7 +8,7 @@ import java.util.concurrent.BlockingQueue; // ANCHOR: WriteTask class WriteTask implements Runnable { private final static Logger logger = LoggerFactory.getLogger(WriteTask.class); - private final static int maxBatchSize = 3000; + private final int maxBatchSize; // the queue from which this writing task get raw data. private final BlockingQueue queue; @@ -16,11 +16,11 @@ class WriteTask implements Runnable { // A flag indicate whether to continue. private boolean active = true; - public WriteTask(BlockingQueue taskQueue) { + public WriteTask(BlockingQueue taskQueue, int maxBatchSize) { this.queue = taskQueue; + this.maxBatchSize = maxBatchSize; } - @Override public void run() { logger.info("started"); @@ -38,7 +38,7 @@ class WriteTask implements Runnable { writer.flush(); } else { // sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, . - Thread.sleep(500); + Thread.sleep(100); } } if (writer.hasBufferedValues()) { diff --git a/docs/examples/java/src/main/resources/highvolume.drawio b/docs/examples/java/src/main/resources/highvolume.drawio new file mode 100644 index 0000000000000000000000000000000000000000..410216061813d307b9e8cc289fe58df05c01e390 --- /dev/null +++ b/docs/examples/java/src/main/resources/highvolume.drawio @@ -0,0 +1,72 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.md index cb13f1c582aa704f58dba2f8a6a0bec92a6379de..aafe9aec3a6a92de4fdc4cd199e4ea81f9fde5b5 100644 --- a/docs/zh/07-develop/03-insert-data/05-high-volume.md +++ b/docs/zh/07-develop/03-insert-data/05-high-volume.md @@ -4,18 +4,20 @@ title: 高效写入 ## 高效写入原理 -本节介绍如何高效地向 TDengine 写入数据。高效写入数据要考虑几个因素:数据在不同表(或子表)之间的分布,即要写入数据的相邻性;单次写入的数据量;并发连接数。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;每批次写入的数据量越大越高效(但超过一定阈值其优势会消失);同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。 +本节介绍如何高效地向 TDengine 写入数据。高效写入数据要考虑几个因素:数据在不同表(或子表)之间的分布,即要写入数据的相邻性;单次写入的数据量;并发连接数。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;每批次写入的数据量越大越高效(但超过一定阈值其优势会消失;同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。 为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。 ## 高效写入方案 下面的示例程序展示了如何高效写入数据: + - TDengine 客户端程序从消息队列或者其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源 - 单个连接向 TDengine 写入的速度无法与读数据的速度相匹配,因此客户端程序启动多个线程,每个线程都建立了与 TDengine 的连接,每个线程都有一个独占的固定大小的消息队列 - 客户端程序将接收到的数据根据所属的表名(或子表名)HASH 到不同的线程,即写入该线程所对应的消息队列,以此确保属于某个表(或子表)的数据一定会被一个固定的线程处理 - 各个子线程在将所关联的消息队列中的数据读空后或者读取数据量达到一个预定的阈值后将该批数据写入 TDengine,并继续处理后面接收到的数据 +![TDengine 高效写入线程模型](highvolume.webp) ## Java 示例程序 @@ -23,18 +25,36 @@ title: 高效写入 ### 主程序 -主程序负责创建队列,并启动读线程和写线程。 +主程序负责: + +1. 创建消息队列 +2. 启动写线程 +3. 启动读线程 +4. 每隔 10 秒统计一次写入速度 + +主程序默认暴露了 4 个参数,每次启动程序都可调节,用于测试和调优: + +1. 读线程个数。默认为 1。 +2. 写线程个数。默认为 3。 +3. 模拟生成的总表数。默认为 1000。将会平分给各个读线程。 +4. 每批最大数据量。默认为 3000。 ```java title="主程序" {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}} ``` +队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。 + ### 读任务的实现 +读任务负责从数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成一点数量表的数据。不同的模拟数据源生成不同表的数据。 + +读任务采用阻塞的方式写消息队列。也就是说,一旦队列满了,写操作就会阻塞。 +

读任务的实现 -```java +```java {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}} ``` @@ -45,21 +65,45 @@ title: 高效写入
写任务的实现 -```java +```java {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}} ``` +
### SQLWriter 类的实现 -SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板建表,然后重现执行 INSERT 语句。 +SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。 + +
+SQLWriter 类的实现 ```java {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java:SQLWriter}} ``` + +
+ ### 执行示例程序 -可用在本地集成开发环境里直接运行示例程序,注意要提前配置环境变量 TDENGINE_JDBC_URL。若要在远程服务器上执行示例程序,可按照下面步骤操作: +执行程序前需配置环境变量 `TDENGINE_JDBC_URL`。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置: + +``` +TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" +``` + +若要在本地集成开发环境执行示例程序,只需: + +1. clone TDengine 仓库 + ``` + git clone git@github.com:taosdata/TDengine.git --depth 1 + ``` +2. 用集成开发环境打开 `docs/examples/java` 目录。 +3. 在开发环境中配置环境变量 `TDENGINE_JDBC_URL`。如果已配置了全局的环境变量 `TDENGINE_JDBC_URL` 可跳过这一步。 +4. 运行类 `com.taos.example.highvolume.FastWriteExample`。 + +若要在服务器上执行示例程序,可按照下面的步骤操作: + 1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行: ``` mvn package @@ -68,23 +112,59 @@ SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都 ``` mkdir -p examples/java ``` -3. 复制依赖到服务器指定目录,比如: +3. 复制依赖到服务器指定目录: + - 复制依赖包,只用复制一次 + ``` + scp -r .\target\lib @:~/examples/java + ``` + + - 复制本程序的 jar 包,每次更新代码都需要复制 + ``` + scp -r .\target\javaexample-1.0.jar @:~/examples/java + ``` +4. 配置环境变量。 + 编辑 `~/.bash_profile` 或 `~/.bashrc` 添加如下内容例如: ``` - scp -r .\target\lib user@host:~/examples/java - scp -r .\target\javaexample-1.0.jar user@host:~/examples/java + export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" ``` -4. 配置环境变量,例如: + 以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。 + +5. 用 java 命令启动示例程序,命令模板: ``` - export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" + java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample ``` -5. 用 java 命令启动示例程序 +6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 CTRL + C 结束程序。 + 下面是一次实际运行的截图: + ``` - java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample + [bding@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000 + 17:01:01.131 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=1, writeTaskCount=9 tableCount=1000 maxBatchSize=2000 + 17:01:01.286 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started + 17:01:01.354 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started + 17:01:01.360 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started + 17:01:01.366 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started + 17:01:01.433 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started + 17:01:01.438 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started + 17:01:01.443 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started + 17:01:01.448 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started + 17:01:01.454 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started + 17:01:01.454 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started + 17:01:11.615 [main] INFO c.t.e.highvolume.FastWriteExample - count=18766442 speed=1876644 + 17:01:21.775 [main] INFO c.t.e.highvolume.FastWriteExample - count=38947464 speed=2018102 + 17:01:32.428 [main] INFO c.t.e.highvolume.FastWriteExample - count=58649571 speed=1970210 + 17:01:42.577 [main] INFO c.t.e.highvolume.FastWriteExample - count=79264890 speed=2061531 + 17:01:53.265 [main] INFO c.t.e.highvolume.FastWriteExample - count=99097476 speed=1983258 + 17:02:04.209 [main] INFO c.t.e.highvolume.FastWriteExample - count=119546779 speed=2044930 + 17:02:14.935 [main] INFO c.t.e.highvolume.FastWriteExample - count=141078914 speed=2153213 + 17:02:25.617 [main] INFO c.t.e.highvolume.FastWriteExample - count=162183457 speed=2110454 + 17:02:36.718 [main] INFO c.t.e.highvolume.FastWriteExample - count=182735614 speed=2055215 + 17:02:46.988 [main] INFO c.t.e.highvolume.FastWriteExample - count=202895614 speed=2016000 ``` ## Python 示例程序 在 Python 示例程序中采用参数绑定的写入方式。(开发中) + diff --git a/docs/zh/07-develop/03-insert-data/highvolume.webp b/docs/zh/07-develop/03-insert-data/highvolume.webp new file mode 100644 index 0000000000000000000000000000000000000000..9f8c9a9cc161e382728750bdfcc93ee086d62aea Binary files /dev/null and b/docs/zh/07-develop/03-insert-data/highvolume.webp differ