From 4c693d5d57ddf7cd0926f19885b4e0d607919f23 Mon Sep 17 00:00:00 2001 From: dingbo Date: Sat, 25 Jun 2022 18:26:25 +0800 Subject: [PATCH] docs: high volume example --- .../com/taos/example/FastWriteExample.java | 161 ++++++++++++++++++ .../03-insert-data/05-high-volume.md | 10 +- 2 files changed, 166 insertions(+), 5 deletions(-) create mode 100644 docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java diff --git a/docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java new file mode 100644 index 0000000000..d026fd6ef9 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java @@ -0,0 +1,161 @@ +package com.taos.example; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + + +class ReadTask { + private final int taskId; + private Thread readThread; + private List writeTasks; + + public ReadTask(int readTaskId, List writeTasks) { + this.taskId = readTaskId; + this.writeTasks = writeTasks; + this.readThread = new Thread(this::doReadTask); + } + + /** + * Simulate getting data from datasource. + */ + private Iterator getSourceDataIterator() { + long now = System.currentTimeMillis(); + int tableCount = 1000; + int numberRows = 10 ^ 7; + int tsStep = 100; // 100 milliseconds + String tbNamePrefix = "tb" + taskId + "-"; + + return new Iterator() { + private long ts = now - numberRows * tsStep; + private int tbId = 0; + + @Override + public boolean hasNext() { + return ts < now; + } + + @Override + public String next() { + if (tbId < tableCount) { + tbId += 1; + } else { + ts += tsStep; + tbId = 0; + } + StringBuilder sb = new StringBuilder(tbNamePrefix + tbId + ","); // tbName + sb.append(ts).append(','); // ts + sb.append(1.0).append(','); // current + sb.append(110).append(','); // voltage + sb.append(0.32).append(','); // phase + sb.append(3).append(','); // groupID + sb.append("Los Angeles"); // location + + return sb.toString(); + } + }; + } + + /** + * Read lines from datasource. And assign each line to a writing task according the table name. + */ + private void doReadTask() { + int numberWriteTask = writeTasks.size(); + Iterator it = getSourceDataIterator(); + + try { + while (it.hasNext()) { + String line = it.next(); + String tbName = line.substring(0, line.indexOf(',')); + int writeTaskId = tbName.hashCode() % numberWriteTask; + writeTasks.get(writeTaskId).put(line); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * start reading thread + */ + public void start() { + this.readThread.start(); + } +} + +class WriteTask { + final static int maxBachSize = 500; + final static int taskQueueCapacity = 1000; + + private Thread writeThread = new Thread(this::doWriteTask); + + private BlockingQueue queue = new LinkedBlockingDeque<>(taskQueueCapacity); + + public void put(String line) throws InterruptedException { + queue.put(line); + } + + private static Connection getConnection() throws SQLException { + String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata"; + return DriverManager.getConnection(jdbcUrl); + } + + public void doWriteTask() { + try { + Connection conn = getConnection(); + Statement stmt = conn.createStatement(); + while (true) { + String line = queue.poll(); + if (line != null) { + + } else { + Thread.sleep(1); + } + } + } catch (Exception e) { + // handle exception + } + + } + + /** + * start writing thread + */ + public void start() { + writeThread.start(); + } +} + +public class FastWriteExample { + + final static int readTaskCount = 1; + final static int writeTaskCount = 4; + final static List readTasks = new ArrayList<>(); + final static List writeTasks = new ArrayList<>(); + + public static void main(String[] args) throws InterruptedException { + // Create write tasks + for (int i = 0; i < writeTaskCount; ++i) { + WriteTask task = new WriteTask(); + task.start(); + writeTasks.add(new WriteTask()); + } + + // Create read tasks + for (int i = 0; i < readTaskCount; ++i) { + ReadTask task = new ReadTask(i, writeTasks); + task.start(); + readTasks.add(task); + } + + while (true) { + Thread.sleep(1000); + } + } +} diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.md index e48c06e2d8..356878f618 100644 --- a/docs/zh/07-develop/03-insert-data/05-high-volume.md +++ b/docs/zh/07-develop/03-insert-data/05-high-volume.md @@ -6,7 +6,7 @@ title: 高效写入 本节介绍如何高效地向 TDengine 写入数据。高效写入数据要考虑几个因素:数据在不同表(或子表)之间的分布,即要写入数据的相邻性;单次写入的数据量;并发连接数。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;每批次写入的数据量越大越高效(但超过一定阈值其优势会消失);同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。 -为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDegnine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。 +为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。 ## 高效写入方案 @@ -21,14 +21,14 @@ title: 高效写入 在 Java 示例程序中采用拼接 SQL 的写入方式。 -```java text=Java 示例程序 -##include{} +```java title="Java 示例程序" +{{#include docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java}} ``` ## Python 示例程序 在 Python 示例程序中采用参数绑定的写入方式。 -```python test=Python 示例程序 -##include{} +```python title="Python 示例程序" + ``` -- GitLab