diff --git a/docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java deleted file mode 100644 index d026fd6ef9eec75f21f5911de577021a4d2ee95a..0000000000000000000000000000000000000000 --- a/docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java +++ /dev/null @@ -1,161 +0,0 @@ -package com.taos.example; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; - - -class ReadTask { - private final int taskId; - private Thread readThread; - private List writeTasks; - - public ReadTask(int readTaskId, List writeTasks) { - this.taskId = readTaskId; - this.writeTasks = writeTasks; - this.readThread = new Thread(this::doReadTask); - } - - /** - * Simulate getting data from datasource. - */ - private Iterator getSourceDataIterator() { - long now = System.currentTimeMillis(); - int tableCount = 1000; - int numberRows = 10 ^ 7; - int tsStep = 100; // 100 milliseconds - String tbNamePrefix = "tb" + taskId + "-"; - - return new Iterator() { - private long ts = now - numberRows * tsStep; - private int tbId = 0; - - @Override - public boolean hasNext() { - return ts < now; - } - - @Override - public String next() { - if (tbId < tableCount) { - tbId += 1; - } else { - ts += tsStep; - tbId = 0; - } - StringBuilder sb = new StringBuilder(tbNamePrefix + tbId + ","); // tbName - sb.append(ts).append(','); // ts - sb.append(1.0).append(','); // current - sb.append(110).append(','); // voltage - sb.append(0.32).append(','); // phase - sb.append(3).append(','); // groupID - sb.append("Los Angeles"); // location - - return sb.toString(); - } - }; - } - - /** - * Read lines from datasource. And assign each line to a writing task according the table name. - */ - private void doReadTask() { - int numberWriteTask = writeTasks.size(); - Iterator it = getSourceDataIterator(); - - try { - while (it.hasNext()) { - String line = it.next(); - String tbName = line.substring(0, line.indexOf(',')); - int writeTaskId = tbName.hashCode() % numberWriteTask; - writeTasks.get(writeTaskId).put(line); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - /** - * start reading thread - */ - public void start() { - this.readThread.start(); - } -} - -class WriteTask { - final static int maxBachSize = 500; - final static int taskQueueCapacity = 1000; - - private Thread writeThread = new Thread(this::doWriteTask); - - private BlockingQueue queue = new LinkedBlockingDeque<>(taskQueueCapacity); - - public void put(String line) throws InterruptedException { - queue.put(line); - } - - private static Connection getConnection() throws SQLException { - String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata"; - return DriverManager.getConnection(jdbcUrl); - } - - public void doWriteTask() { - try { - Connection conn = getConnection(); - Statement stmt = conn.createStatement(); - while (true) { - String line = queue.poll(); - if (line != null) { - - } else { - Thread.sleep(1); - } - } - } catch (Exception e) { - // handle exception - } - - } - - /** - * start writing thread - */ - public void start() { - writeThread.start(); - } -} - -public class FastWriteExample { - - final static int readTaskCount = 1; - final static int writeTaskCount = 4; - final static List readTasks = new ArrayList<>(); - final static List writeTasks = new ArrayList<>(); - - public static void main(String[] args) throws InterruptedException { - // Create write tasks - for (int i = 0; i < writeTaskCount; ++i) { - WriteTask task = new WriteTask(); - task.start(); - writeTasks.add(new WriteTask()); - } - - // Create read tasks - for (int i = 0; i < readTaskCount; ++i) { - ReadTask task = new ReadTask(i, writeTasks); - task.start(); - readTasks.add(task); - } - - while (true) { - Thread.sleep(1000); - } - } -} diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java new file mode 100644 index 0000000000000000000000000000000000000000..3f642d8851c0dfc969c66174def78c11b3bb65ec --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java @@ -0,0 +1,37 @@ +package com.taos.example.highvolume; + +import java.util.ArrayList; +import java.util.List; + +// ANCHOR: main +public class FastWriteExample { + + final static int readTaskCount = 1; + final static int writeTaskCount = 4; + final static List readTasks = new ArrayList<>(); + final static List writeTasks = new ArrayList<>(); + + public static void main(String[] args) throws InterruptedException { + // Create writing tasks. + // Every writing task contains a work thread and a task queue. + for (int i = 0; i < writeTaskCount; ++i) { + WriteTask task = new WriteTask(); + task.start(); + writeTasks.add(new WriteTask()); + } + + // Create reading tasks. + // Every reading task contains a work thread and a reference to each writing task. + for (int i = 0; i < readTaskCount; ++i) { + ReadTask task = new ReadTask(i, writeTasks); + task.start(); + readTasks.add(task); + } + + while (true) { + Thread.sleep(1000); + } + } +} + +// ANCHOR_END: main \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java new file mode 100644 index 0000000000000000000000000000000000000000..2cef5302a2f4ea937c565c7ef0f8dd551aa8e402 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java @@ -0,0 +1,85 @@ +package com.taos.example.highvolume; + +import java.util.Iterator; +import java.util.List; + + +class FakeDataSource implements Iterator { + private long now = System.currentTimeMillis(); + private int tableCount = 1000; + private int numberRows = 10 ^ 7; + private int tsStep = 100; // 100 milliseconds + private String tbNamePrefix; + private long ts = now - numberRows * tsStep; + private int tbId = 0; + + public FakeDataSource(String tbNamePrefix) { + this.tbNamePrefix = tbNamePrefix; + + } + + @Override + public boolean hasNext() { + return ts < now; + } + + @Override + public String next() { + if (tbId < tableCount) { + tbId += 1; + } else { + ts += tsStep; + tbId = 0; + } + StringBuilder sb = new StringBuilder(tbNamePrefix + tbId + ","); // tbName + sb.append(ts).append(','); // ts + sb.append(1.0).append(','); // current + sb.append(110).append(','); // voltage + sb.append(0.32).append(','); // phase + sb.append(3).append(','); // groupID + sb.append("Los Angeles"); // location + + return sb.toString(); + } +} + +// ANCHOR: ReadTask +class ReadTask { + private final int taskId; + private Thread readThread; + private List writeTasks; + + public ReadTask(int readTaskId, List writeTasks) { + this.taskId = readTaskId; + this.writeTasks = writeTasks; + this.readThread = new Thread(this::doReadTask); + } + + /** + * Read lines from datasource. + * And assign each line to a writing task according to the table name. + */ + private void doReadTask() { + int numberWriteTask = writeTasks.size(); + Iterator it = new FakeDataSource("t" + this.taskId + "tb"); + try { + while (it.hasNext()) { + String line = it.next(); + String tbName = line.substring(0, line.indexOf(',')); + int writeTaskId = tbName.hashCode() % numberWriteTask; + writeTasks.get(writeTaskId).put(line); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * Start reading thread. + */ + public void start() { + this.readThread.start(); + } +} + +// ANCHOR_END: ReadTask \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java new file mode 100644 index 0000000000000000000000000000000000000000..7fe09eec02658c627303ab7a1e56805014b49654 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java @@ -0,0 +1,83 @@ +package com.taos.example.highvolume; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +// ANCHOR: WriteTask +class WriteTask { + // How many records will trigger writing before queue + final static int maxBatchSize = 500; + // + final static int taskQueueCapacity = 1000; + + private Thread writeThread = new Thread(this::doWriteTask); + + private BlockingQueue queue = new LinkedBlockingDeque<>(taskQueueCapacity); + + /** + * Public interface for adding task to task queue. + * It will be invoked in read thread. + */ + public void put(String line) throws InterruptedException { + queue.put(line); + } + + /** + * Start writing thread. + */ + public void start() { + writeThread.start(); + } + + private static Connection getConnection() throws SQLException { + String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata"; + return DriverManager.getConnection(jdbcUrl); + } + + private void doWriteTask() { + int count = 0; + try { + Connection conn = getConnection(); + Statement stmt = conn.createStatement(); + Map tbValues = new HashMap<>(); + while (true) { + String line = queue.poll(); + if (line != null) { + processLine(tbValues, line); + count += 1; + if (count == maxBatchSize) { + // trigger writing when count of buffered records reached maxBachSize + flushValues(stmt, tbValues); + count = 0; + } + } else if (count == 0) { + // if queue is empty and no buffered records, sleep a while to avoid high CPU usage. + Thread.sleep(500); + } else { + // if queue is empty and there are buffered records then flush immediately + flushValues(stmt, tbValues); + count = 0; + } + } + } catch (Exception e) { + // handle exception + } + + } + + private void processLine(Map tbValues, String line) { + + } + + private void flushValues(Statement stmt, Map tbValues) { + + } + +} +// ANCHOR_END: WriteTask \ No newline at end of file diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.md index 356878f61814c79335c1d189280f4928d5c85ecc..53f193b664796db7a0e7cbd94c289ac371686f39 100644 --- a/docs/zh/07-develop/03-insert-data/05-high-volume.md +++ b/docs/zh/07-develop/03-insert-data/05-high-volume.md @@ -21,10 +21,29 @@ title: 高效写入 在 Java 示例程序中采用拼接 SQL 的写入方式。 -```java title="Java 示例程序" -{{#include docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java}} +```java title="主程序" +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}} ``` +
+写任务的实现 + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}} +``` +
+ +
+读任务的实现 + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}} +``` + +
+ + + ## Python 示例程序 在 Python 示例程序中采用参数绑定的写入方式。