diff --git a/docs/examples/.gitignore b/docs/examples/.gitignore index b50ab6c63b6a8968ba92af7d04edb446ec3d2776..17fc1eca5df942829c72e4adb24e97e47b436455 100644 --- a/docs/examples/.gitignore +++ b/docs/examples/.gitignore @@ -1,4 +1,6 @@ .vscode *.lock +.env +*.~ .idea -.env \ No newline at end of file + diff --git a/docs/examples/java/pom.xml b/docs/examples/java/pom.xml index a48ba398da92f401235819d067aa2ba6f8b173ea..77c6a3ad60135a023ee1e72c2220e904c1f6313f 100644 --- a/docs/examples/java/pom.xml +++ b/docs/examples/java/pom.xml @@ -24,6 +24,16 @@ 2.0.38 + + org.slf4j + slf4j-api + 1.7.36 + + + ch.qos.logback + logback-classic + 1.2.11 + junit junit @@ -31,5 +41,36 @@ test + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.5 + + true + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + false + false + true + + + + + + diff --git a/docs/examples/java/src/main/java/com/taos/example/TestTableNotExits.java b/docs/examples/java/src/main/java/com/taos/example/TestTableNotExits.java new file mode 100644 index 0000000000000000000000000000000000000000..89fa8eaed5f7fa90bb56e21c7427a9f12fb8fa4e --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/TestTableNotExits.java @@ -0,0 +1,26 @@ +package com.taos.example; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +public class TestTableNotExits { + private static Connection getConnection() throws SQLException { + String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; + return DriverManager.getConnection(jdbcUrl); + } + public static void main(String[] args) throws SQLException { + try(Connection conn = getConnection()) { + try(Statement stmt = conn.createStatement()) { + try { + stmt.executeUpdate("insert into test.t1 values(1, 2) test.t2 values(3, 4)"); + } catch (SQLException e) { + System.out.println(e.getErrorCode()); + System.out.println(Integer.toHexString(e.getErrorCode())); + System.out.println(e); + } + } + } + } +} diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java index 3f642d8851c0dfc969c66174def78c11b3bb65ec..7e96397b04442c1d39ccdc32cc2e242ca6b15445 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java @@ -1,37 +1,110 @@ package com.taos.example.highvolume; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * Prepare target database. + * Count total records in database periodically so that we can estimate the writing speed. + */ +class DataBaseMonitor { + private Connection conn; + private Statement stmt; + + public DataBaseMonitor init() throws SQLException { + if (conn == null) { + String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + conn = DriverManager.getConnection(jdbcURL); + stmt = conn.createStatement(); + } + return this; + } + + public void close() { + try { + stmt.close(); + } catch (SQLException e) { + } + try { + conn.close(); + } catch (SQLException e) { + } + } + + public void prepareDatabase() throws SQLException { + stmt.execute("DROP DATABASE IF EXISTS test"); + stmt.execute("CREATE DATABASE test"); + stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"); + } + + public Long count() throws SQLException { + if (!stmt.isClosed()) { + ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters"); + result.next(); + return result.getLong(1); + } + return null; + } +} // ANCHOR: main public class FastWriteExample { + final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); - final static int readTaskCount = 1; - final static int writeTaskCount = 4; + final static int taskQueueCapacity = Integer.MAX_VALUE / 100; + final static List> taskQueues = new ArrayList<>(); final static List readTasks = new ArrayList<>(); final static List writeTasks = new ArrayList<>(); + final static DataBaseMonitor databaseMonitor = new DataBaseMonitor(); + + public static void stopAll() { + logger.info("shutting down"); + readTasks.forEach(task -> task.stop()); + writeTasks.forEach(task -> task.stop()); + databaseMonitor.close(); + } + + public static void main(String[] args) throws InterruptedException, SQLException { + int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1; + int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 1; + logger.info("readTaskCount={}, writeTaskCount={}", readTaskCount, writeTaskCount); + + databaseMonitor.init().prepareDatabase(); - public static void main(String[] args) throws InterruptedException { - // Create writing tasks. - // Every writing task contains a work thread and a task queue. + // Create task queues, whiting tasks and start writing threads. for (int i = 0; i < writeTaskCount; ++i) { - WriteTask task = new WriteTask(); - task.start(); - writeTasks.add(new WriteTask()); + BlockingQueue queue = new ArrayBlockingQueue<>(taskQueueCapacity); + taskQueues.add(queue); + WriteTask task = new WriteTask(queue); + Thread t = new Thread(task); + t.setName("WriteThread-" + i); + t.start(); } - // Create reading tasks. - // Every reading task contains a work thread and a reference to each writing task. + // create reading tasks and start reading threads for (int i = 0; i < readTaskCount; ++i) { - ReadTask task = new ReadTask(i, writeTasks); - task.start(); - readTasks.add(task); + ReadTask task = new ReadTask(i, taskQueues); + Thread t = new Thread(task); + t.setName("ReadThread-" + i); + t.start(); } + Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll)); + + long lastCount = 0; while (true) { - Thread.sleep(1000); + Thread.sleep(10000); + long count = databaseMonitor.count(); + logger.info("total_count={} speed={}", count, (count - lastCount) / 10); + lastCount = count; } } } - // ANCHOR_END: main \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java index c43dffb51a626352d7ab8dd410e01c8426f882c4..0e94f876fc48b50a31b3d89c3e0f5377136e8cd2 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java @@ -1,84 +1,107 @@ package com.taos.example.highvolume; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Iterator; import java.util.List; +import java.util.concurrent.BlockingQueue; - -class FakeDataSource implements Iterator { - private long now = System.currentTimeMillis(); - private int tableCount = 1000; - private int numberRows = 10 ^ 7; - private int tsStep = 100; // 100 milliseconds +/** + * Generate test data + */ +class MockDataSource implements Iterator { private String tbNamePrefix; - private long ts = now - numberRows * tsStep; - private int tbId = 0; + private int tableCount = 1000; + private long maxRowsPerTable = 1000000000L; - public FakeDataSource(String tbNamePrefix) { - this.tbNamePrefix = tbNamePrefix; + // 100 milliseconds between two neighbouring rows. + long startMs = System.currentTimeMillis() - maxRowsPerTable * 100; + private int currentRow = 0; + private int currentTbId = -1; + + // mock values + String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"}; + float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f}; + int[] voltage = {119, 116, 111, 113, 118}; + float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f}; + public MockDataSource(String tbNamePrefix) { + this.tbNamePrefix = tbNamePrefix; } @Override public boolean hasNext() { - return ts < now; + currentTbId += 1; + if (currentTbId == tableCount) { + currentTbId = 0; + currentRow += 1; + } + return currentRow < maxRowsPerTable; } @Override public String next() { - if (tbId < tableCount) { - tbId += 1; - } else { - ts += tsStep; - tbId = 0; - } - StringBuilder sb = new StringBuilder(tbNamePrefix + tbId + ","); // tbName + long ts = startMs + 100 * currentRow; + int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1; + StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName sb.append(ts).append(','); // ts - sb.append(1.0).append(','); // current - sb.append(110).append(','); // voltage - sb.append(0.32).append(','); // phase - sb.append(3).append(','); // groupID - sb.append("Los Angeles"); // location + sb.append(current[currentRow % 5]).append(','); // current + sb.append(voltage[currentRow % 5]).append(','); // voltage + sb.append(phase[currentRow % 5]).append(','); // phase + sb.append(location[currentRow % 5]).append(','); // location + sb.append(groupId); // groupID return sb.toString(); } } // ANCHOR: ReadTask -class ReadTask { +class ReadTask implements Runnable { + private final static Logger logger = LoggerFactory.getLogger(ReadTask.class); private final int taskId; - private Thread readThread; - private List writeTasks; + private final List> taskQueues; + private final int queueCount; + private boolean active = true; - public ReadTask(int readTaskId, List writeTasks) { + public ReadTask(int readTaskId, List> queues) { this.taskId = readTaskId; - this.writeTasks = writeTasks; - this.readThread = new Thread(this::doReadTask); + this.taskQueues = queues; + this.queueCount = queues.size(); } /** - * Read lines from datasource. - * And assign each line to a writing task according to the table name. + * Assign data received to different queues. + * Here we use the suffix number in table name. + * You are expected to define your own rule in practice. + * + * @param line record received + * @return which queue to use */ - private void doReadTask() { - int numberWriteTask = writeTasks.size(); - Iterator it = new FakeDataSource("t" + this.taskId + "tb"); + public int getQueueId(String line) { + String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101 + String suffixNumber = tbName.split("_")[1]; + return Integer.parseInt(suffixNumber) % this.queueCount; + } + + @Override + public void run() { + logger.info("started"); + Iterator it = new MockDataSource("tb" + this.taskId); try { - while (it.hasNext()) { + while (it.hasNext() && active) { String line = it.next(); - String tbName = line.substring(0, line.indexOf(',')); - int writeTaskId = Math.abs(tbName.hashCode()) % numberWriteTask; - writeTasks.get(writeTaskId).put(line); + int queueId = getQueueId(line); + taskQueues.get(queueId).put(line); } - } catch (InterruptedException e) { - e.printStackTrace(); + } catch (Exception e) { + logger.error("Read Task Error", e); } } - /** - * Start reading thread. - */ - public void start() { - this.readThread.start(); + public void stop() { + logger.info("stop"); + this.active = false; } } diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..83705bbe5f84d2b10060f6102fbb92c572c2dc06 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java @@ -0,0 +1,179 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +// ANCHOR: SQLWriter + +/** + * A helper class encapsulate the logic of writing using SQL. + *

+ * The main interfaces are two methods: + *

    + *
  1. {@link SQLWriter#processLine}, which receive raw data from WriteTask and group raw data by table names.
  2. + *
  3. {@link SQLWriter#flush}, which assemble INSERT statement and execute it.
  4. + *
+ *

+ * There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb". + * This ensure that checking table existence is a one-time-only operation. + *

+ * + *

+ */ +public class SQLWriter { + final static Logger logger = LoggerFactory.getLogger(SQLWriter.class); + + private Connection conn; + private Statement stmt; + + /** + * current number of buffered records + */ + private int bufferedCount = 0; + /** + * Maximum number of buffered records. + * Flush action will be triggered if bufferedCount reached this value, + */ + private int maxBatchSize; + + /** + * Map from table name to column values. For example: + * "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)" + */ + private Map tbValues = new HashMap<>(); + + /** + * Map from table name to tag values in the same order as creating stable. + * Used for creating table. + */ + private Map tbTags = new HashMap<>(); + + public SQLWriter(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + } + + + /** + * Get Database Connection + * + * @return Connection + * @throws SQLException + */ + private static Connection getConnection() throws SQLException { + String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + return DriverManager.getConnection(jdbcURL); + } + + /** + * Create Connection and Statement + * + * @throws SQLException + */ + public void init() throws SQLException { + conn = getConnection(); + stmt = conn.createStatement(); + stmt.execute("use test"); + } + + /** + * Convert raw data to SQL fragments, group them by table name and cache them in a HashMap. + * Trigger writing when number of buffered records reached maxBachSize. + * + * @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId + */ + public void processLine(String line) throws SQLException { + bufferedCount += 1; + int firstComma = line.indexOf(','); + String tbName = line.substring(0, firstComma); + int lastComma = line.lastIndexOf(','); + int secondLastComma = line.lastIndexOf(',', lastComma - 1); + String values = "(" + line.substring(firstComma + 1, secondLastComma) + ") "; + if (tbValues.containsKey(tbName)) { + tbValues.put(tbName, tbValues.get(tbName) + values); + } else { + tbValues.put(tbName, values); + } + if (!tbTags.containsKey(tbName)) { + String location = line.substring(secondLastComma + 1, lastComma); + String groupId = line.substring(lastComma + 1); + String tagValues = "('" + location + "'," + groupId + ')'; + tbTags.put(tbName, tagValues); + } + if (bufferedCount == maxBatchSize) { + flush(); + } + } + + + /** + * Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it. + * In case of "Table does not exit" exception, create all tables in the sql and retry the sql. + */ + public void flush() throws SQLException { + StringBuilder sb = new StringBuilder("INSERT INTO "); + for (Map.Entry entry : tbValues.entrySet()) { + String tableName = entry.getKey(); + String values = entry.getValue(); + sb.append(tableName).append(" values ").append(values).append(" "); + } + String sql = sb.toString(); + try { + stmt.executeUpdate(sql); + } catch (SQLException e) { + // convert to error code defined in taoserror.h + int errorCode = e.getErrorCode() & 0xffff; + if (errorCode == 0x362 || errorCode == 0x218) { + // Table does not exist + createTables(); + stmt.executeUpdate(sql); + } else { + throw e; + } + } + tbValues.clear(); + bufferedCount = 0; + } + + /** + * Create tables in batch using syntax: + *

+ * CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...; + *

+ */ + private void createTables() throws SQLException { + StringBuilder sb = new StringBuilder("CREATE TABLE "); + for (String tbName : tbValues.keySet()) { + String tagValues = tbTags.get(tbName); + sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" "); + } + String sql = sb.toString(); + stmt.executeUpdate(sql); + } + + public boolean hasBufferedValues() { + return bufferedCount > 0; + } + + public int getBufferedCount() { + return bufferedCount; + } + + public void close() { + try { + stmt.close(); + } catch (SQLException e) { + } + try { + conn.close(); + } catch (SQLException e) { + } + } +} +// ANCHOR_END: SQLWriter diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java index 6a5083c69908155a4de401e5d132b60cafc0c98b..1118bb7e9e882c6d3524522b95e1a0f120f3db33 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java @@ -1,82 +1,60 @@ package com.taos.example.highvolume; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; // ANCHOR: WriteTask -class WriteTask { - final static int maxBatchSize = 500; - // - final static int taskQueueCapacity = 1000; - - private Thread writeThread = new Thread(this::doWriteTask); +class WriteTask implements Runnable { + private final static Logger logger = LoggerFactory.getLogger(WriteTask.class); + private final static int maxBatchSize = 3000; - private BlockingQueue queue = new LinkedBlockingDeque<>(taskQueueCapacity); + // the queue from which this writing task get raw data. + private final BlockingQueue queue; - /** - * Public interface for adding task to task queue. - * It will be invoked in read thread. - */ - public void put(String line) throws InterruptedException { - queue.put(line); - } + // A flag indicate whether to continue. + private boolean active = true; - /** - * Start writing thread. - */ - public void start() { - writeThread.start(); + public WriteTask(BlockingQueue taskQueue) { + this.queue = taskQueue; } - private static Connection getConnection() throws SQLException { - String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; - return DriverManager.getConnection(jdbcUrl); - } - private void doWriteTask() { - int count = 0; + @Override + public void run() { + logger.info("started"); + String line = null; // data getting from the queue just now. + SQLWriter writer = new SQLWriter(maxBatchSize); try { - Connection conn = getConnection(); - Statement stmt = conn.createStatement(); - Map tbValues = new HashMap<>(); - while (true) { - String line = queue.poll(); + writer.init(); + while (active) { + line = queue.poll(); if (line != null) { - processLine(tbValues, line); - count += 1; - if (count == maxBatchSize) { - // trigger writing when count of buffered records reached maxBachSize - flushValues(stmt, tbValues); - count = 0; - } - } else if (count == 0) { - // if queue is empty and no buffered records, sleep a while to avoid high CPU usage. - Thread.sleep(500); + // parse raw data and buffer the data. + writer.processLine(line); + } else if (writer.hasBufferedValues()) { + // write data immediately if no more data in the queue + writer.flush(); } else { - // if queue is empty and there are buffered records then flush immediately - flushValues(stmt, tbValues); - count = 0; + // sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, . + Thread.sleep(500); } } + if (writer.hasBufferedValues()) { + writer.flush(); + } } catch (Exception e) { - // handle exception + String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount()); + logger.error(msg, e); + } finally { + writer.close(); } - - } - - private void processLine(Map tbValues, String line) { - } - private void flushValues(Statement stmt, Map tbValues) { - + public void stop() { + logger.info("stop"); + this.active = false; } - } // ANCHOR_END: WriteTask \ No newline at end of file diff --git a/docs/examples/java/src/main/resources/logback.xml b/docs/examples/java/src/main/resources/logback.xml new file mode 100644 index 0000000000000000000000000000000000000000..898887fe6ab5b86b4bb069032fe7a9e9c2b9148b --- /dev/null +++ b/docs/examples/java/src/main/resources/logback.xml @@ -0,0 +1,14 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.md index 53f193b664796db7a0e7cbd94c289ac371686f39..cb13f1c582aa704f58dba2f8a6a0bec92a6379de 100644 --- a/docs/zh/07-develop/03-insert-data/05-high-volume.md +++ b/docs/zh/07-develop/03-insert-data/05-high-volume.md @@ -21,33 +21,70 @@ title: 高效写入 在 Java 示例程序中采用拼接 SQL 的写入方式。 +### 主程序 + +主程序负责创建队列,并启动读线程和写线程。 + ```java title="主程序" {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}} ``` +### 读任务的实现 +
-写任务的实现 +读任务的实现 ```java -{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}} +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}} ``` +
+### 写任务的实现 +
-读任务的实现 +写任务的实现 ```java -{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}} +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}} ``` -
+### SQLWriter 类的实现 +SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板建表,然后重现执行 INSERT 语句。 -## Python 示例程序 +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java:SQLWriter}} +``` +### 执行示例程序 + +可用在本地集成开发环境里直接运行示例程序,注意要提前配置环境变量 TDENGINE_JDBC_URL。若要在远程服务器上执行示例程序,可按照下面步骤操作: +1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行: + ``` + mvn package + ``` +2. 远程服务器上创建 examples 目录: + ``` + mkdir -p examples/java + ``` +3. 复制依赖到服务器指定目录,比如: + ``` + scp -r .\target\lib user@host:~/examples/java + scp -r .\target\javaexample-1.0.jar user@host:~/examples/java + ``` +4. 配置环境变量,例如: + ``` + export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" + ``` +5. 用 java 命令启动示例程序 + ``` + java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample + ``` -在 Python 示例程序中采用参数绑定的写入方式。 +## Python 示例程序 -```python title="Python 示例程序" +在 Python 示例程序中采用参数绑定的写入方式。(开发中) +