diff --git a/docs/examples/java/pom.xml b/docs/examples/java/pom.xml index bead800bdd29dfd51434aa113eb11afa28201934..77c6a3ad60135a023ee1e72c2220e904c1f6313f 100644 --- a/docs/examples/java/pom.xml +++ b/docs/examples/java/pom.xml @@ -41,5 +41,36 @@ test + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.5 + + true + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + false + false + true + + + + + + diff --git a/docs/examples/java/src/main/java/com/taos/example/TestTableNotExits.java b/docs/examples/java/src/main/java/com/taos/example/TestTableNotExits.java new file mode 100644 index 0000000000000000000000000000000000000000..89fa8eaed5f7fa90bb56e21c7427a9f12fb8fa4e --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/TestTableNotExits.java @@ -0,0 +1,26 @@ +package com.taos.example; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +public class TestTableNotExits { + private static Connection getConnection() throws SQLException { + String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; + return DriverManager.getConnection(jdbcUrl); + } + public static void main(String[] args) throws SQLException { + try(Connection conn = getConnection()) { + try(Statement stmt = conn.createStatement()) { + try { + stmt.executeUpdate("insert into test.t1 values(1, 2) test.t2 values(3, 4)"); + } catch (SQLException e) { + System.out.println(e.getErrorCode()); + System.out.println(Integer.toHexString(e.getErrorCode())); + System.out.println(e); + } + } + } + } +} diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java index 195f362595678c5ed7fd8bc1e373dfeb4071b2d1..4a454cbb3f4b49bfb8a4a991c74584edd473c7a1 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java @@ -3,29 +3,80 @@ package com.taos.example.highvolume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +/** + * Prepare target database. + * Count total records in database periodically so that we can estimate the writing speed. + */ +class DataBaseMonitor { + private Connection conn; + private Statement stmt; + + public DataBaseMonitor init() throws SQLException { + if (conn == null) { + String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + conn = DriverManager.getConnection(jdbcURL); + stmt = conn.createStatement(); + } + return this; + } + + public void close() { + try { + stmt.close(); + } catch (SQLException e) { + } + try { + conn.close(); + } catch (SQLException e) { + } + } + + public void prepareDatabase() throws SQLException { + stmt.execute("DROP DATABASE IF EXISTS test"); + stmt.execute("CREATE DATABASE test"); + stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"); + } + + public Long count() throws SQLException { + if (!stmt.isClosed()) { + ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters"); + result.next(); + return result.getLong(1); + } + return null; + } +} + // ANCHOR: main public class FastWriteExample { final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); - final static int readTaskCount = 1; - final static int writeTaskCount = 1; final static int taskQueueCapacity = 1000; final static List> taskQueues = new ArrayList<>(); final static List readTasks = new ArrayList<>(); final static List writeTasks = new ArrayList<>(); + final static DataBaseMonitor databaseMonitor = new DataBaseMonitor(); public static void stopAll() { - logger.info("stopAll"); + logger.info("shutting down"); readTasks.forEach(task -> task.stop()); writeTasks.forEach(task -> task.stop()); + databaseMonitor.close(); } - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException, SQLException { + int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1; + int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 1; + logger.info("readTaskCount={}, writeTaskCount={}", readTaskCount, writeTaskCount); + + databaseMonitor.init().prepareDatabase(); + // Create task queues, whiting tasks and start writing threads. for (int i = 0; i < writeTaskCount; ++i) { BlockingQueue queue = new ArrayBlockingQueue<>(taskQueueCapacity); @@ -45,7 +96,14 @@ public class FastWriteExample { } Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll)); + + long lastCount = 0; + while (true) { + Thread.sleep(10000); + long count = databaseMonitor.count(); + logger.info("total_count={} speed={}", count, (count - lastCount) / 10); + lastCount = count; + } } } - // ANCHOR_END: main \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java index 2679e9afa354ab0cd696acc6788c30fd90b3f9a5..fe3f78aed9f17e784039d8d22f60fbfd9ba34800 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java @@ -12,8 +12,8 @@ import java.util.concurrent.BlockingQueue; */ class MockDataSource implements Iterator { private String tbNamePrefix; - private int tableCount = 10; - private int totalRowsPerTable = 10; + private int tableCount = 100; + private int totalRowsPerTable = 1000000; // 100 milliseconds between two neighbouring rows. long startMs = System.currentTimeMillis() - totalRowsPerTable * 100; @@ -28,7 +28,6 @@ class MockDataSource implements Iterator { public MockDataSource(String tbNamePrefix) { this.tbNamePrefix = tbNamePrefix; - } @Override @@ -50,8 +49,8 @@ class MockDataSource implements Iterator { sb.append(current[currentRow % 5]).append(','); // current sb.append(voltage[currentRow % 5]).append(','); // voltage sb.append(phase[currentRow % 5]).append(','); // phase - sb.append(groupId).append(','); // groupID - sb.append(location[currentRow % 5]); // location + sb.append(location[currentRow % 5]).append(','); // location + sb.append(groupId); // groupID return sb.toString(); } diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..83705bbe5f84d2b10060f6102fbb92c572c2dc06 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java @@ -0,0 +1,179 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +// ANCHOR: SQLWriter + +/** + * A helper class encapsulate the logic of writing using SQL. + *

+ * The main interfaces are two methods: + *

    + *
  1. {@link SQLWriter#processLine}, which receive raw data from WriteTask and group raw data by table names.
  2. + *
  3. {@link SQLWriter#flush}, which assemble INSERT statement and execute it.
  4. + *
+ *

+ * There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb". + * This ensure that checking table existence is a one-time-only operation. + *

+ * + *

+ */ +public class SQLWriter { + final static Logger logger = LoggerFactory.getLogger(SQLWriter.class); + + private Connection conn; + private Statement stmt; + + /** + * current number of buffered records + */ + private int bufferedCount = 0; + /** + * Maximum number of buffered records. + * Flush action will be triggered if bufferedCount reached this value, + */ + private int maxBatchSize; + + /** + * Map from table name to column values. For example: + * "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)" + */ + private Map tbValues = new HashMap<>(); + + /** + * Map from table name to tag values in the same order as creating stable. + * Used for creating table. + */ + private Map tbTags = new HashMap<>(); + + public SQLWriter(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + } + + + /** + * Get Database Connection + * + * @return Connection + * @throws SQLException + */ + private static Connection getConnection() throws SQLException { + String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + return DriverManager.getConnection(jdbcURL); + } + + /** + * Create Connection and Statement + * + * @throws SQLException + */ + public void init() throws SQLException { + conn = getConnection(); + stmt = conn.createStatement(); + stmt.execute("use test"); + } + + /** + * Convert raw data to SQL fragments, group them by table name and cache them in a HashMap. + * Trigger writing when number of buffered records reached maxBachSize. + * + * @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId + */ + public void processLine(String line) throws SQLException { + bufferedCount += 1; + int firstComma = line.indexOf(','); + String tbName = line.substring(0, firstComma); + int lastComma = line.lastIndexOf(','); + int secondLastComma = line.lastIndexOf(',', lastComma - 1); + String values = "(" + line.substring(firstComma + 1, secondLastComma) + ") "; + if (tbValues.containsKey(tbName)) { + tbValues.put(tbName, tbValues.get(tbName) + values); + } else { + tbValues.put(tbName, values); + } + if (!tbTags.containsKey(tbName)) { + String location = line.substring(secondLastComma + 1, lastComma); + String groupId = line.substring(lastComma + 1); + String tagValues = "('" + location + "'," + groupId + ')'; + tbTags.put(tbName, tagValues); + } + if (bufferedCount == maxBatchSize) { + flush(); + } + } + + + /** + * Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it. + * In case of "Table does not exit" exception, create all tables in the sql and retry the sql. + */ + public void flush() throws SQLException { + StringBuilder sb = new StringBuilder("INSERT INTO "); + for (Map.Entry entry : tbValues.entrySet()) { + String tableName = entry.getKey(); + String values = entry.getValue(); + sb.append(tableName).append(" values ").append(values).append(" "); + } + String sql = sb.toString(); + try { + stmt.executeUpdate(sql); + } catch (SQLException e) { + // convert to error code defined in taoserror.h + int errorCode = e.getErrorCode() & 0xffff; + if (errorCode == 0x362 || errorCode == 0x218) { + // Table does not exist + createTables(); + stmt.executeUpdate(sql); + } else { + throw e; + } + } + tbValues.clear(); + bufferedCount = 0; + } + + /** + * Create tables in batch using syntax: + *

+ * CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...; + *

+ */ + private void createTables() throws SQLException { + StringBuilder sb = new StringBuilder("CREATE TABLE "); + for (String tbName : tbValues.keySet()) { + String tagValues = tbTags.get(tbName); + sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" "); + } + String sql = sb.toString(); + stmt.executeUpdate(sql); + } + + public boolean hasBufferedValues() { + return bufferedCount > 0; + } + + public int getBufferedCount() { + return bufferedCount; + } + + public void close() { + try { + stmt.close(); + } catch (SQLException e) { + } + try { + conn.close(); + } catch (SQLException e) { + } + } +} +// ANCHOR_END: SQLWriter diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java index eaf88552f4a350f7a4dbe21b6145276dca0683c2..1118bb7e9e882c6d3524522b95e1a0f120f3db33 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java @@ -3,74 +3,55 @@ package com.taos.example.highvolume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.BlockingQueue; // ANCHOR: WriteTask class WriteTask implements Runnable { private final static Logger logger = LoggerFactory.getLogger(WriteTask.class); - private final static int maxBatchSize = 500; + private final static int maxBatchSize = 3000; + + // the queue from which this writing task get raw data. private final BlockingQueue queue; + + // A flag indicate whether to continue. private boolean active = true; public WriteTask(BlockingQueue taskQueue) { this.queue = taskQueue; } - private static Connection getConnection() throws SQLException, ClassNotFoundException { - String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); - return DriverManager.getConnection(jdbcURL); - } @Override public void run() { logger.info("started"); - int bufferedCount = 0; - String line = null; - try (Connection conn = getConnection()) { - try (Statement stmt = conn.createStatement()) { - Map tbValues = new HashMap<>(); - while (active) { - line = queue.poll(); - if (line != null) { - processLine(tbValues, line); - bufferedCount += 1; - if (bufferedCount == maxBatchSize) { - // trigger writing when count of buffered records reached maxBachSize - flushValues(stmt, tbValues); - bufferedCount = 0; - } - } else if (bufferedCount == 0) { - // if queue is empty and no buffered records, sleep a while to avoid high CPU usage. - Thread.sleep(500); - } else { - // if queue is empty and there are buffered records then flush immediately - flushValues(stmt, tbValues); - bufferedCount = 0; - } - } - if (bufferedCount > 0) { - flushValues(stmt, tbValues); + String line = null; // data getting from the queue just now. + SQLWriter writer = new SQLWriter(maxBatchSize); + try { + writer.init(); + while (active) { + line = queue.poll(); + if (line != null) { + // parse raw data and buffer the data. + writer.processLine(line); + } else if (writer.hasBufferedValues()) { + // write data immediately if no more data in the queue + writer.flush(); + } else { + // sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, . + Thread.sleep(500); } } + if (writer.hasBufferedValues()) { + writer.flush(); + } } catch (Exception e) { - String msg = String.format("line=%s, bufferedCount=%s", line, bufferedCount); + String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount()); logger.error(msg, e); + } finally { + writer.close(); } } - private void processLine(Map tbValues, String line) { - } - - private void flushValues(Statement stmt, Map tbValues) { - StringBuilder sb = new StringBuilder("INSERT INTO"); - } - public void stop() { logger.info("stop"); this.active = false; diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.md index 53f193b664796db7a0e7cbd94c289ac371686f39..a6d58c97eefc74f4405ad98dcbe44641c76eb4de 100644 --- a/docs/zh/07-develop/03-insert-data/05-high-volume.md +++ b/docs/zh/07-develop/03-insert-data/05-high-volume.md @@ -21,28 +21,58 @@ title: 高效写入 在 Java 示例程序中采用拼接 SQL 的写入方式。 +### 主程序 + ```java title="主程序" {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}} ``` +### 读任务的实现
-写任务的实现 +读任务的实现 ```java -{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}} +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}} ``` +
+### 写任务的实现 +
-读任务的实现 +写任务的实现 ```java -{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}} +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}} ``` -
- +### SQLWriter 类的实现 + +### 执行示例程序 + +可用在本地集成开发环境里直接运行示例程序,注意要提前配置环境变量 TDENGINE_JDBC_URL。若要在远程服务器上执行示例程序,可按照下面步骤操作: +1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行: + ``` + mvn package + ``` +2. 远程服务器上创建 examples 目录: + ``` + mkdir -p examples/java + ``` +3. 复制依赖到服务器指定目录,比如: + ``` + scp -r .\target\lib user@host:~/examples/java + scp -r .\target\javaexample-1.0.jar user@host:~/examples/java + ``` +4. 配置环境变量,例如: + ``` + export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" + ``` +5. 用 java 命令启动示例程序 + ``` + java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample + ``` ## Python 示例程序