diff --git a/docs/examples/java/src/main/java/com/taos/example/TestTableNotExits.java b/docs/examples/java/src/main/java/com/taos/example/TestTableNotExits.java
new file mode 100644
index 0000000000000000000000000000000000000000..89fa8eaed5f7fa90bb56e21c7427a9f12fb8fa4e
--- /dev/null
+++ b/docs/examples/java/src/main/java/com/taos/example/TestTableNotExits.java
@@ -0,0 +1,26 @@
+package com.taos.example;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class TestTableNotExits {
+ private static Connection getConnection() throws SQLException {
+ String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
+ return DriverManager.getConnection(jdbcUrl);
+ }
+ public static void main(String[] args) throws SQLException {
+ try(Connection conn = getConnection()) {
+ try(Statement stmt = conn.createStatement()) {
+ try {
+ stmt.executeUpdate("insert into test.t1 values(1, 2) test.t2 values(3, 4)");
+ } catch (SQLException e) {
+ System.out.println(e.getErrorCode());
+ System.out.println(Integer.toHexString(e.getErrorCode()));
+ System.out.println(e);
+ }
+ }
+ }
+ }
+}
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java
new file mode 100644
index 0000000000000000000000000000000000000000..5c513ec28224061bc9dadfcc87e37dcbf45dc8d6
--- /dev/null
+++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java
@@ -0,0 +1,47 @@
+package com.taos.example.highvolume;
+
+import java.sql.*;
+
+/**
+ * Prepare target database.
+ * Count total records in database periodically so that we can estimate the writing speed.
+ */
+public class DataBaseMonitor {
+ private Connection conn;
+ private Statement stmt;
+
+ public DataBaseMonitor init() throws SQLException {
+ if (conn == null) {
+ String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
+ conn = DriverManager.getConnection(jdbcURL);
+ stmt = conn.createStatement();
+ }
+ return this;
+ }
+
+ public void close() {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ }
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ }
+ }
+
+ public void prepareDatabase() throws SQLException {
+ stmt.execute("DROP DATABASE IF EXISTS test");
+ stmt.execute("CREATE DATABASE test");
+ stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
+ }
+
+ public Long count() throws SQLException {
+ if (!stmt.isClosed()) {
+ ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
+ result.next();
+ return result.getLong(1);
+ }
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java
index e8af1a68ea60d213ad700446dfb6b953be6f1d34..15672dddd95d942b79678b738ad18aeeb3e46441 100644
--- a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java
+++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java
@@ -9,51 +9,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-/**
- * Prepare target database.
- * Count total records in database periodically so that we can estimate the writing speed.
- */
-class DataBaseMonitor {
- private Connection conn;
- private Statement stmt;
- public DataBaseMonitor init() throws SQLException {
- if (conn == null) {
- String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
- conn = DriverManager.getConnection(jdbcURL);
- stmt = conn.createStatement();
- }
- return this;
- }
-
- public void close() {
- try {
- stmt.close();
- } catch (SQLException e) {
- }
- try {
- conn.close();
- } catch (SQLException e) {
- }
- }
-
- public void prepareDatabase() throws SQLException {
- stmt.execute("DROP DATABASE IF EXISTS test");
- stmt.execute("CREATE DATABASE test");
- stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
- }
-
- public Long count() throws SQLException {
- if (!stmt.isClosed()) {
- ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
- result.next();
- return result.getLong(1);
- }
- return null;
- }
-}
-
-// ANCHOR: main
public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
@@ -110,5 +66,4 @@ public class FastWriteExample {
lastCount = count;
}
}
-}
-// ANCHOR_END: main
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java
new file mode 100644
index 0000000000000000000000000000000000000000..6fe83f002ebcb9d82e026e9a32886fd22bfefbe9
--- /dev/null
+++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java
@@ -0,0 +1,53 @@
+package com.taos.example.highvolume;
+
+import java.util.Iterator;
+
+/**
+ * Generate test data
+ */
+class MockDataSource implements Iterator {
+ private String tbNamePrefix;
+ private int tableCount;
+ private long maxRowsPerTable = 1000000000L;
+
+ // 100 milliseconds between two neighbouring rows.
+ long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
+ private int currentRow = 0;
+ private int currentTbId = -1;
+
+ // mock values
+ String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
+ float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
+ int[] voltage = {119, 116, 111, 113, 118};
+ float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
+
+ public MockDataSource(String tbNamePrefix, int tableCount) {
+ this.tbNamePrefix = tbNamePrefix;
+ this.tableCount = tableCount;
+ }
+
+ @Override
+ public boolean hasNext() {
+ currentTbId += 1;
+ if (currentTbId == tableCount) {
+ currentTbId = 0;
+ currentRow += 1;
+ }
+ return currentRow < maxRowsPerTable;
+ }
+
+ @Override
+ public String next() {
+ long ts = startMs + 100 * currentRow;
+ int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
+ StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
+ sb.append(ts).append(','); // ts
+ sb.append(current[currentRow % 5]).append(','); // current
+ sb.append(voltage[currentRow % 5]).append(','); // voltage
+ sb.append(phase[currentRow % 5]).append(','); // phase
+ sb.append(location[currentRow % 5]).append(','); // location
+ sb.append(groupId); // groupID
+
+ return sb.toString();
+ }
+}
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java
index 94cde899f49b3d517e320c06d355b13fca95c2f5..a6fcfed1d28281d46aff493ef9783972858ebe62 100644
--- a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java
+++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java
@@ -7,57 +7,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-/**
- * Generate test data
- */
-class MockDataSource implements Iterator {
- private String tbNamePrefix;
- private int tableCount;
- private long maxRowsPerTable = 1000000000L;
-
- // 100 milliseconds between two neighbouring rows.
- long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
- private int currentRow = 0;
- private int currentTbId = -1;
-
- // mock values
- String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
- float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
- int[] voltage = {119, 116, 111, 113, 118};
- float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
-
- public MockDataSource(String tbNamePrefix, int tableCount) {
- this.tbNamePrefix = tbNamePrefix;
- this.tableCount = tableCount;
- }
-
- @Override
- public boolean hasNext() {
- currentTbId += 1;
- if (currentTbId == tableCount) {
- currentTbId = 0;
- currentRow += 1;
- }
- return currentRow < maxRowsPerTable;
- }
-
- @Override
- public String next() {
- long ts = startMs + 100 * currentRow;
- int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
- StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
- sb.append(ts).append(','); // ts
- sb.append(current[currentRow % 5]).append(','); // current
- sb.append(voltage[currentRow % 5]).append(','); // voltage
- sb.append(phase[currentRow % 5]).append(','); // phase
- sb.append(location[currentRow % 5]).append(','); // location
- sb.append(groupId); // groupID
-
- return sb.toString();
- }
-}
-
-// ANCHOR: ReadTask
class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId;
@@ -106,6 +55,4 @@ class ReadTask implements Runnable {
logger.info("stop");
this.active = false;
}
-}
-
-// ANCHOR_END: ReadTask
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java
index b13d6c363c7f3b79168f87f7db1604652382d332..c2989acdbe3d0f56d7451ac86051a55955ce14de 100644
--- a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java
+++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java
@@ -7,8 +7,6 @@ import java.sql.*;
import java.util.HashMap;
import java.util.Map;
-// ANCHOR: SQLWriter
-
/**
* A helper class encapsulate the logic of writing using SQL.
*
@@ -154,10 +152,14 @@ public class SQLWriter {
if (errorCode == 0x362 || errorCode == 0x218) {
// Table does not exist
createTables();
- stmt.executeUpdate(sql);
+ executeSQL(sql);
} else {
+ logger.error("Execute SQL: {}", sql);
throw e;
}
+ } catch (Throwable throwable) {
+ logger.error("Execute SQL: {}", sql);
+ throw throwable;
}
}
@@ -174,7 +176,12 @@ public class SQLWriter {
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
}
String sql = sb.toString();
- stmt.executeUpdate(sql);
+ try {
+ stmt.executeUpdate(sql);
+ } catch (Throwable throwable) {
+ logger.error("Execute SQL: {}", sql);
+ throw throwable;
+ }
}
public boolean hasBufferedValues() {
@@ -195,5 +202,4 @@ public class SQLWriter {
} catch (SQLException e) {
}
}
-}
-// ANCHOR_END: SQLWriter
+}
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/StmtWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/StmtWriter.java
new file mode 100644
index 0000000000000000000000000000000000000000..8ade06625d708a112c85d5657aa00bcd0e605ff4
--- /dev/null
+++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/StmtWriter.java
@@ -0,0 +1,4 @@
+package com.taos.example.highvolume;
+
+public class StmtWriter {
+}
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java
index bbe9cb07704519bf43c6187844c296e227315505..de9e5463d7dc59478f991e4783aacaae527b4c4b 100644
--- a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java
+++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java
@@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
-// ANCHOR: WriteTask
class WriteTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final int maxBatchSize;
@@ -56,5 +55,4 @@ class WriteTask implements Runnable {
logger.info("stop");
this.active = false;
}
-}
-// ANCHOR_END: WriteTask
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/docs/examples/java/src/main/resources/highvolume.drawio b/docs/examples/java/src/main/resources/highvolume.drawio
new file mode 100644
index 0000000000000000000000000000000000000000..410216061813d307b9e8cc289fe58df05c01e390
--- /dev/null
+++ b/docs/examples/java/src/main/resources/highvolume.drawio
@@ -0,0 +1,72 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docs/examples/java/src/main/resources/highvolume2.drawio b/docs/examples/java/src/main/resources/highvolume2.drawio
new file mode 100644
index 0000000000000000000000000000000000000000..8c9ae090071d93574e98305d3c8e458539a6b50d
--- /dev/null
+++ b/docs/examples/java/src/main/resources/highvolume2.drawio
@@ -0,0 +1,76 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docs/examples/java/src/main/resources/logback.xml b/docs/examples/java/src/main/resources/logback.xml
new file mode 100644
index 0000000000000000000000000000000000000000..898887fe6ab5b86b4bb069032fe7a9e9c2b9148b
--- /dev/null
+++ b/docs/examples/java/src/main/resources/logback.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docs/examples/python/fast_write_example.py b/docs/examples/python/fast_write_example.py
new file mode 100644
index 0000000000000000000000000000000000000000..c9d606388fdecd85f1468f24cc497ecc5941f035
--- /dev/null
+++ b/docs/examples/python/fast_write_example.py
@@ -0,0 +1,180 @@
+# install dependencies:
+# recommend python >= 3.8
+# pip3 install faster-fifo
+#
+
+import logging
+import math
+import sys
+import time
+import os
+from multiprocessing import Process
+from faster_fifo import Queue
+from mockdatasource import MockDataSource
+from queue import Empty
+from typing import List
+
+logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
+
+READ_TASK_COUNT = 1
+WRITE_TASK_COUNT = 1
+TABLE_COUNT = 1000
+QUEUE_SIZE = 1000000
+MAX_BATCH_SIZE = 3000
+
+read_processes = []
+write_processes = []
+
+
+def get_connection():
+ """
+ If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
+ You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
+ """
+ import taos
+ firstEP = os.environ.get("TDENGINE_FIRST_EP")
+ if firstEP:
+ host, port = firstEP.split(":")
+ else:
+ host, port = None, 0
+ user = os.environ.get("TDENGINE_USER", "root")
+ password = os.environ.get("TDENGINE_PASSWORD", "taosdata")
+ return taos.connect(host=host, port=int(port), user=user, password=password)
+
+
+# ANCHOR: read
+
+def run_read_task(task_id: int, task_queues: List[Queue]):
+ table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
+ data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
+ try:
+ for batch in data_source:
+ for table_id, rows in batch:
+ # hash data to different queue
+ i = table_id % len(task_queues)
+ # block putting forever when the queue is full
+ task_queues[i].put_many(rows, block=True, timeout=-1)
+ except KeyboardInterrupt:
+ pass
+
+
+# ANCHOR_END: read
+
+# ANCHOR: write
+def run_write_task(task_id: int, queue: Queue):
+ from sql_writer import SQLWriter
+ log = logging.getLogger(f"WriteTask-{task_id}")
+ writer = SQLWriter(get_connection)
+ lines = None
+ try:
+ while True:
+ try:
+ # get as many as possible
+ lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
+ writer.process_lines(lines)
+ except Empty:
+ time.sleep(0.01)
+ except KeyboardInterrupt:
+ pass
+ except BaseException as e:
+ log.debug(f"lines={lines}")
+ raise e
+
+
+# ANCHOR_END: write
+
+def set_global_config():
+ argc = len(sys.argv)
+ if argc > 1:
+ global READ_TASK_COUNT
+ READ_TASK_COUNT = int(sys.argv[1])
+ if argc > 2:
+ global WRITE_TASK_COUNT
+ WRITE_TASK_COUNT = int(sys.argv[2])
+ if argc > 3:
+ global TABLE_COUNT
+ TABLE_COUNT = int(sys.argv[3])
+ if argc > 4:
+ global QUEUE_SIZE
+ QUEUE_SIZE = int(sys.argv[4])
+ if argc > 5:
+ global MAX_BATCH_SIZE
+ MAX_BATCH_SIZE = int(sys.argv[5])
+
+
+# ANCHOR: monitor
+def run_monitor_process():
+ log = logging.getLogger("DataBaseMonitor")
+ conn = get_connection()
+ conn.execute("DROP DATABASE IF EXISTS test")
+ conn.execute("CREATE DATABASE test")
+ conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
+ "TAGS (location BINARY(64), groupId INT)")
+
+ def get_count():
+ res = conn.query("SELECT count(*) FROM test.meters")
+ rows = res.fetch_all()
+ return rows[0][0] if rows else 0
+
+ last_count = 0
+ while True:
+ time.sleep(10)
+ count = get_count()
+ log.info(f"count={count} speed={(count - last_count) / 10}")
+ last_count = count
+
+
+# ANCHOR_END: monitor
+# ANCHOR: main
+def main():
+ set_global_config()
+ logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
+ f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
+
+ monitor_process = Process(target=run_monitor_process)
+ monitor_process.start()
+ time.sleep(3) # waiting for database ready.
+
+ task_queues: List[Queue] = []
+ # create task queues
+ for i in range(WRITE_TASK_COUNT):
+ queue = Queue(max_size_bytes=QUEUE_SIZE)
+ task_queues.append(queue)
+
+ # create write processes
+ for i in range(WRITE_TASK_COUNT):
+ p = Process(target=run_write_task, args=(i, task_queues[i]))
+ p.start()
+ logging.debug(f"WriteTask-{i} started with pid {p.pid}")
+ write_processes.append(p)
+
+ # create read processes
+ for i in range(READ_TASK_COUNT):
+ queues = assign_queues(i, task_queues)
+ p = Process(target=run_read_task, args=(i, queues))
+ p.start()
+ logging.debug(f"ReadTask-{i} started with pid {p.pid}")
+ read_processes.append(p)
+
+ try:
+ monitor_process.join()
+ except KeyboardInterrupt:
+ monitor_process.terminate()
+ [p.terminate() for p in read_processes]
+ [p.terminate() for p in write_processes]
+ [q.close() for q in task_queues]
+
+
+def assign_queues(read_task_id, task_queues):
+ """
+ Compute target queues for a specific read task.
+ """
+ ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
+ from_index = math.floor(read_task_id * ratio)
+ end_index = math.ceil((read_task_id + 1) * ratio)
+ return task_queues[from_index:end_index]
+
+
+if __name__ == '__main__':
+ main()
+# ANCHOR_END: main
diff --git a/docs/examples/python/mockdatasource.py b/docs/examples/python/mockdatasource.py
new file mode 100644
index 0000000000000000000000000000000000000000..852860aec0adc8f9b043c9dcd5deb0bf00239201
--- /dev/null
+++ b/docs/examples/python/mockdatasource.py
@@ -0,0 +1,49 @@
+import time
+
+
+class MockDataSource:
+ samples = [
+ "8.8,119,0.32,LosAngeles,0",
+ "10.7,116,0.34,SanDiego,1",
+ "9.9,111,0.33,Hollywood,2",
+ "8.9,113,0.329,Compton,3",
+ "9.4,118,0.141,San Francisco,4"
+ ]
+
+ def __init__(self, tb_name_prefix, table_count):
+ self.table_name_prefix = tb_name_prefix + "_"
+ self.table_count = table_count
+ self.max_rows = 10000000
+ self.current_ts = round(time.time() * 1000) - self.max_rows * 100
+ # [(tableId, tableName, values),]
+ self.data = self._init_data()
+
+ def _init_data(self):
+ lines = self.samples * (self.table_count // 5 + 1)
+ data = []
+ for i in range(self.table_count):
+ table_name = self.table_name_prefix + str(i)
+ data.append((i, table_name, lines[i])) # tableId, row
+ return data
+
+ def __iter__(self):
+ self.row = 0
+ return self
+
+ def __next__(self):
+ """
+ next 1000 rows for each table.
+ return: {tableId:[row,...]}
+ """
+ # generate 1000 timestamps
+ ts = []
+ for _ in range(1000):
+ self.current_ts += 100
+ ts.append(str(self.current_ts))
+ # add timestamp to each row
+ # [(tableId, ["tableName,ts,current,voltage,phase,location,groupId"])]
+ result = []
+ for table_id, table_name, values in self.data:
+ rows = [table_name + ',' + t + ',' + values for t in ts]
+ result.append((table_id, rows))
+ return result
diff --git a/docs/examples/python/sql_writer.py b/docs/examples/python/sql_writer.py
index ad5653f0ece10a482036b3d41880984ef4b38e61..cb04f85c239af7c4801e2a5ef0483a88b21245ef 100644
--- a/docs/examples/python/sql_writer.py
+++ b/docs/examples/python/sql_writer.py
@@ -18,6 +18,7 @@ class SQLWriter:
name = r[0]
if name == "maxSQLLength":
return int(r[1])
+ return 1024 * 1024
def process_lines(self, lines: str):
"""
@@ -71,11 +72,19 @@ class SQLWriter:
if error_code == 0x362 or error_code == 0x218:
self.create_tables()
else:
+ self.log.error("Execute SQL: %s", sql)
raise e
+ except BaseException as baseException:
+ self.log.error("Execute SQL: %s", sql)
+ raise baseException
def create_tables(self):
sql = "CREATE TABLE "
for tb in self._tb_values.keys():
tag_values = self._tb_tags[tb]
sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
- self._conn.execute(sql)
+ try:
+ self._conn.execute(sql)
+ except BaseException as e:
+ self.log.error("Execute SQL: %s", sql)
+ raise e
diff --git a/docs/examples/python/stmt_writer.py b/docs/examples/python/stmt_writer.py
new file mode 100644
index 0000000000000000000000000000000000000000..60846b5a6491491655905008b58e6411818720fb
--- /dev/null
+++ b/docs/examples/python/stmt_writer.py
@@ -0,0 +1,2 @@
+class StmtWriter:
+ pass
diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.md
index ed8b137358f61b1e111d20c9887e05181639dd7f..e43b225d77706fba34ac2fb37bbabbc209bce09f 100644
--- a/docs/zh/07-develop/03-insert-data/05-high-volume.md
+++ b/docs/zh/07-develop/03-insert-data/05-high-volume.md
@@ -1,36 +1,88 @@
----
-title: 高效写入
----
+import Tabs from "@theme/Tabs";
+import TabItem from "@theme/TabItem";
+
+# 高效写入
+
+本节介绍如何高效地向 TDengine 写入数据。
## 高效写入原理 {#principle}
-本节介绍如何高效地向 TDengine 写入数据。高效写入数据要考虑几个因素:数据在不同表(或子表)之间的分布,即要写入数据的相邻性;单次写入的数据量;并发连接数。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;每批次写入的数据量越大越高效(但超过一定阈值其优势会消失;同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。
+### 客户端程序的角度 {#application-view}
+
+从客户端程序的角度来说,高效写入数据要考虑以下几个因素:
+
+1. 单次写入的数据量。一般来讲,每批次写入的数据量越大越高效(但超过一定阈值其优势会消失)。使用 SQL 写入 TDengine 时,尽量在一条 SQL 中拼接更多数据。目前,TDengine 支持的一条 SQL 的最大长度为 1,048,576(1M)个字符。可通过配置客户端参数 maxSQLLength(默认值为 65480)进行修改。
+2. 并发连接数。一般来讲,同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。
+3. 数据在不同表(或子表)之间的分布,即要写入数据的相邻性。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;
+4. 写入方式。一般来讲:
+ - 参数绑定写入比 SQL 写入更高效。因参数绑定方式避免了 SQL 解析。(但增加了 C 接口的调用次数,对于连接器也有性能损耗)。
+ - SQL 写入不自动建表比自动建表更高效。因自动建表要频繁检查表是否存在
+ - SQL 写入比无模式写入更高效。因无模式写入会自动建表且支持动态更改表结构
+
+客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。
+
+### 数据源的角度 {#datasource-view}
+
+客户端程序通常需要从数据源读数据再写入 TDengine。从数据源角度来说,以下几种情况需要在读线程和写线程之间增加队列:
+
+1. 有多个数据源,单个数据源生成数据的速度远小于单线程写入的速度,但数据量整体比较大。此时队列的作用是把多个数据源的数据汇聚到一起,增加单次写入的数据量。
+2. 单个数据源生成数据的速度远大于单线程写入的速度。此时队列的作用是增加写入的并发度。
+3. 单张表的数据分散在多个数据源。此时队列的作用是将同一张表的数据提前汇聚到一起,提高写入时数据的相邻性。
+
+如果写应用的数据源是 Kafka, 写应用本身即 Kafka 的消费者,则可利用 Kafka 的特性实现高效写入。比如:
+
+1. 将同一张表的数据写到同一个 Topic 的同一个 Partition,增加数据的相邻性
+2. 通过订阅多个 Topic 实现数据汇聚
+3. 通过增加 Consumer 线程数增加写入的并发度
+4. 通过增加每次 fetch 的最大数据量来增加单次写入的最大数据量
+
+### 服务器配置的角度 {#setting-view}
-为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。
+从服务器配置的角度来说,也有很多优化写入性能的方法。
-为了使写入最高效,除了客户端程序的设计,服务端的配置也很重要。如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库有可能用到 10 个 vgroup。更多性能调优参数请参考[配置参考](../../reference/config)性能调优部分。
+如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库可能用到 10 个 vgroup。
-## 高效写入方案 {#scenario}
+更多调优参数,请参考[性能优化](../../operation/optimize)和[配置参考](../../reference/config)部分。
-下面的示例程序展示了如何高效写入数据:
+## 高效写入示例 {#sample-code}
-- TDengine 客户端程序从消息队列或者其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源
+### 场景设计 {#scenario}
+
+下面的示例程序展示了如何高效写入数据,场景设计如下:
+
+- TDengine 客户端程序从其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源
- 单个连接向 TDengine 写入的速度无法与读数据的速度相匹配,因此客户端程序启动多个线程,每个线程都建立了与 TDengine 的连接,每个线程都有一个独占的固定大小的消息队列
- 客户端程序将接收到的数据根据所属的表名(或子表名)HASH 到不同的线程,即写入该线程所对应的消息队列,以此确保属于某个表(或子表)的数据一定会被一个固定的线程处理
- 各个子线程在将所关联的消息队列中的数据读空后或者读取数据量达到一个预定的阈值后将该批数据写入 TDengine,并继续处理后面接收到的数据
-![TDengine 高效写入线程模型](highvolume.webp)
+![TDengine 高效写入示例场景的线程模型](highvolume.webp)
+
+### 示例代码 {#code}
+
+这一部分是针对以上场景的示例代码。对于其它场景高效写入原理相同,不过代码需要适当修改。
+
+本示例代码假设源数据属于同一张超级表(meters)的不同子表。程序在开始写入数据之前已经在 test 库创建了这个超级表。对于子表,将根据收到的数据,由应用程序自动创建。如果实际场景是多个超级表,只需修改写任务自动建表的代码。
+
+
+
-:::note
-上图所示架构,每个写任务只负责写特定的表,体现了数据的相邻性原则。但是读任务所读的表,我们假设是随机的。这样一个队列有多个写入线程(或进程),队列内部可能产生锁的消耗。实际场景,如果能做到一个读任务对应一个写任务是最好的。
-:::
+**程序清单**
-## Java 示例程序 {#java-demo}
+| 类名 | 功能说明 |
+| ---------------- | --------------------------------------------------------------------------- |
+| FastWriteExample | 主程序 |
+| ReadTask | 从模拟源中读取数据,将表名经过 hash 后得到 Queue 的 index,写入对应的 Queue |
+| WriteTask | 从 Queue 中获取数据,组成一个 Batch,写入 TDengine |
+| MockDataSource | 模拟生成一定数量 meters 子表的数据 |
+| SQLWriter | WriteTask 依赖这个类完成 SQL 拼接、自动建表、 SQL 写入、SQL 长度检查 |
+| StmtWriter | 实现参数绑定方式批量写入(暂未完成) |
+| DataBaseMonitor | 统计写入速度,并每隔 10 秒把当前写入速度打印到控制台 |
-在 Java 示例程序中采用拼接 SQL 的写入方式。
-### 主程序 {#java-demo-main}
+以下是各类的完整代码和更详细的功能说明。
+
+FastWriteExample
主程序负责:
1. 创建消息队列
@@ -45,70 +97,80 @@ title: 高效写入
3. 模拟生成的总表数。默认为 1000。将会平分给各个读线程。
4. 每批最多写入记录数量。默认为 3000。
-
-主程序
+队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。 示例程序默认值已经设置地足够大。
```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}}
+{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java}}
```
-
-队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。
-
-### 读任务的实现 {#java-demo-read}
+
+ReadTask
读任务负责从数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成一点数量表的数据。不同的模拟数据源生成不同表的数据。
读任务采用阻塞的方式写消息队列。也就是说,一旦队列满了,写操作就会阻塞。
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java}}
+```
+
+
+
-读任务的实现
+WriteTask
```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}}
+{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java}}
```
-### 写任务的实现 {#java-demo-write}
-
-写任务的实现
+
+MockDataSource
```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}}
+{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java}}
```
-### SQLWriter 类的实现 {#java-demo-sql-writer}
+
+
+SQLWriter
-SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。
+SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是在 catch 到表不存在异常的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。对于其它异常,这里简单地记录当时执行的 SQL 语句到日志中,你也可以记录更多线索到日志,已便排查错误和故障恢复。
+
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java}}
+```
+
+
-SQLWriter 类的实现
+
+DataBaseMonitor
```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java:SQLWriter}}
+{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java}}
```
-### 执行示例程序 {#run-java-demo}
+**执行步骤**
执行 Java 示例程序
-
执行程序前需配置环境变量 `TDENGINE_JDBC_URL`。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置:
```
TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
-#### 本地集成开发环境执行示例程序 {#java-demo-local-run}
+**本地集成开发环境执行示例程序**
1. clone TDengine 仓库
```
@@ -118,7 +180,7 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
3. 在开发环境中配置环境变量 `TDENGINE_JDBC_URL`。如果已配置了全局的环境变量 `TDENGINE_JDBC_URL` 可跳过这一步。
4. 运行类 `com.taos.example.highvolume.FastWriteExample`。
-#### 远程服务器上执行示例程序 {#java-demo-remote-run}
+**远程服务器上执行示例程序**
若要在服务器上执行示例程序,可按照下面的步骤操作:
@@ -132,61 +194,93 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
3. 复制依赖到服务器指定目录:
- 复制依赖包,只用复制一次
- ```
- scp -r .\target\lib @:~/examples/java
- ```
-
+ ```
+ scp -r .\target\lib @:~/examples/java
+ ```
- 复制本程序的 jar 包,每次更新代码都需要复制
- ```
- scp -r .\target\javaexample-1.0.jar @:~/examples/java
- ```
+ ```
+ scp -r .\target\javaexample-1.0.jar @:~/examples/java
+ ```
4. 配置环境变量。
编辑 `~/.bash_profile` 或 `~/.bashrc` 添加如下内容例如:
+
```
export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
+
以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。
5. 用 java 命令启动示例程序,命令模板:
-
+
```
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample
```
6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 CTRL + C 结束程序。
- 下面是一次实际运行的截图:
+ 下面是一次实际运行的日志输出,机器配置 16核 + 64G + 固态硬盘。
```
- [testuser@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000
- 17:01:01.131 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=1, writeTaskCount=9 tableCount=1000 maxBatchSize=2000
- 17:01:01.286 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
- 17:01:01.354 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
- 17:01:01.360 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
- 17:01:01.366 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
- 17:01:01.433 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
- 17:01:01.438 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
- 17:01:01.443 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
- 17:01:01.448 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
- 17:01:01.454 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
- 17:01:01.454 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
- 17:01:11.615 [main] INFO c.t.e.highvolume.FastWriteExample - count=18766442 speed=1876644
- 17:01:21.775 [main] INFO c.t.e.highvolume.FastWriteExample - count=38947464 speed=2018102
- 17:01:32.428 [main] INFO c.t.e.highvolume.FastWriteExample - count=58649571 speed=1970210
- 17:01:42.577 [main] INFO c.t.e.highvolume.FastWriteExample - count=79264890 speed=2061531
- 17:01:53.265 [main] INFO c.t.e.highvolume.FastWriteExample - count=99097476 speed=1983258
- 17:02:04.209 [main] INFO c.t.e.highvolume.FastWriteExample - count=119546779 speed=2044930
- 17:02:14.935 [main] INFO c.t.e.highvolume.FastWriteExample - count=141078914 speed=2153213
- 17:02:25.617 [main] INFO c.t.e.highvolume.FastWriteExample - count=162183457 speed=2110454
- 17:02:36.718 [main] INFO c.t.e.highvolume.FastWriteExample - count=182735614 speed=2055215
- 17:02:46.988 [main] INFO c.t.e.highvolume.FastWriteExample - count=202895614 speed=2016000
+ root@vm85$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 2 12
+ 18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000
+ 18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started
+ 18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
+ 18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started
+ 18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
+ 18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444
+ 18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521
+ 18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394
+ 18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933
+ 18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696
+ 18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729
+ 18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521
+ 18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788
+ 18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950
```
-## Python 示例程序 {#python-demo}
+
+
-该 Python 示例程序中采用了多进程的架构,并使用了跨进程的队列通信。写任务采用拼装 SQL 的方式写入。
-### main 函数 {#python-demo-main}
+**程序清单**
+
+Python 示例程序中采用了多进程的架构,并使用了跨进程的消息队列。
+
+| 函数或类 | 功能说明 |
+| ------------------------ | -------------------------------------------------------------------- |
+| main 函数 | 程序入口, 创建各个子进程和消息队列 |
+| run_monitor_process 函数 | 创建数据库,超级表,统计写入速度并定时打印到控制台 |
+| run_read_task 函数 | 读进程主要逻辑,负责从其它数据系统读数据,并分发数据到为之分配的队列 |
+| MockDataSource 类 | 模拟数据源, 实现迭代器接口,每次批量返回每张表的接下来 1000 条数据 |
+| run_write_task 函数 | 写进程主要逻辑。每次从队列中取出尽量多的数据,并批量写入 |
+| SQLWriter类 | SQL 写入和自动建表 |
+| StmtWriter 类 | 实现参数绑定方式批量写入(暂未完成) |
+
+
+
+main 函数
main 函数负责创建消息队列和启动子进程,子进程有 3 类:
@@ -202,76 +296,62 @@ main 函数可以接收 5 个启动参数,依次是:
4. 队列大小(单位字节),默认为 1000000
5. 每批最多写入记录数量, 默认为 3000
-
-
-main 函数
-
```python
-{{#include docs/examples/python/highvolume_faster_queue.py:main}}
+{{#include docs/examples/python/fast_write_example.py:main}}
```
-### 监控进程
+
+run_monitor_process
监控进程负责初始化数据库,并监控当前的写入速度。
-
-Monitor Process
-
```python
-{{#include docs/examples/python/highvolume_faster_queue.py:monitor}}
+{{#include docs/examples/python/fast_write_example.py:monitor}}
```
-### 读进程 {#python-read-process}
-
-#### 读进程主要逻辑 {#python-run-read-task}
-
-读进程,负责从其它数据系统读数据,并分发数据到各个写进程。
-
run_read_task 函数
+读进程,负责从其它数据系统读数据,并分发数据到为之分配的队列。
+
```python
-{{#include docs/examples/python/highvolume_faster_queue.py:read}}
+{{#include docs/examples/python/fast_write_example.py:read}}
```
-#### 模拟数据源 {#python-mock-data-source}
-
-以下是模拟数据源的实现,我们假设数据源生成的每一条数据都带有目标表名信息。实际中你可能需要一定的规则确定目标表名。
-
+
MockDataSource
+以下是模拟数据源的实现,我们假设数据源生成的每一条数据都带有目标表名信息。实际中你可能需要一定的规则确定目标表名。
+
```python
-{{#include docs/examples/python/highvolume_faster_queue.py:MockDataSource}}
+{{#include docs/examples/python/mockdatasource.py}}
```
-### 写进程 {#python-write-process}
-
-写进程每次从队列中取出尽量多的数据,并批量写入。
-
run_write_task 函数
+写进程每次从队列中取出尽量多的数据,并批量写入。
+
```python
-{{#include docs/examples/python/highvolume_faster_queue.py:write}}
+{{#include docs/examples/python/fast_write_example.py:write}}
```
-
-
-### SQLWriter 类的实现 {#python-sql-writer}
-SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。这个类也对 SQL 是否超过最大长度限制做了检查,如果接近 SQL 最大长度限制(maxSQLLength),将会立即执行 SQL。为了减少 SQL 执行次数,建议将 maxSQLLength 适当调大。
+
+SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提前创建,而是在发生表不存在错误的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。对于其它错误会记录当时执行的 SQL, 以便排查错误和故障恢复。这个类也对 SQL 是否超过最大长度限制做了检查,如果接近 SQL 最大长度限制(maxSQLLength),将会立即执行 SQL。为了减少 SQL 此时,建议将 maxSQLLength 适当调大。
+
SQLWriter
```python
@@ -280,62 +360,73 @@ SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提
-### 执行示例程序 {#run-python-demo}
+**执行步骤**
执行 Python 示例程序
1. 前提条件
+
- 已安装 TDengine 客户端驱动
- 已安装 Python3, 推荐版本 >= 3.8
- 已安装 taospy
2. 安装 faster-fifo 代替 python 内置的 multiprocessing.Queue
+
```
pip3 install faster-fifo
```
-3. 点击上面的“查看源码”链接复制 `highvolume_faster_queue.py` 和 `sql_writer.py` 两个文件。
+3. 点击上面的“查看源码”链接复制 `fast_write_example.py` 、 `sql_writer.py` 和 `mockdatasource.py` 三个文件。
4. 执行示例程序
-
+
```
- python3 highvolume_faster_queue.py
+ python3 fast_write_example.py
```
-下面是一次实际运行的输出:
+ 下面是一次实际运行的输出, 机器配置 16核 + 64G + 固态硬盘。
-```
-[testuser@vm95 python]$ python3.6 highvolume_faster_queue.py 9 9 1000 5000000 3000
-2022-07-13 10:05:50,504 [root] - READ_TASK_COUNT=9, WRITE_TASK_COUNT=9, TABLE_COUNT=1000, QUEUE_SIZE=5000000, MAX_BATCH_SIZE=3000
-2022-07-13 10:05:53,542 [root] - WriteTask-0 started with pid 5475
-2022-07-13 10:05:53,542 [root] - WriteTask-1 started with pid 5476
-2022-07-13 10:05:53,543 [root] - WriteTask-2 started with pid 5477
-2022-07-13 10:05:53,543 [root] - WriteTask-3 started with pid 5478
-2022-07-13 10:05:53,544 [root] - WriteTask-4 started with pid 5479
-2022-07-13 10:05:53,544 [root] - WriteTask-5 started with pid 5480
-2022-07-13 10:05:53,545 [root] - WriteTask-6 started with pid 5481
-2022-07-13 10:05:53,546 [root] - WriteTask-7 started with pid 5482
-2022-07-13 10:05:53,546 [root] - WriteTask-8 started with pid 5483
-2022-07-13 10:05:53,547 [root] - ReadTask-0 started with pid 5484
-2022-07-13 10:05:53,548 [root] - ReadTask-1 started with pid 5485
-2022-07-13 10:05:53,549 [root] - ReadTask-2 started with pid 5486
-2022-07-13 10:05:53,550 [root] - ReadTask-3 started with pid 5487
-2022-07-13 10:05:53,551 [root] - ReadTask-4 started with pid 5488
-2022-07-13 10:05:53,552 [root] - ReadTask-5 started with pid 5489
-2022-07-13 10:05:53,552 [root] - ReadTask-6 started with pid 5490
-2022-07-13 10:05:53,553 [root] - ReadTask-7 started with pid 5491
-2022-07-13 10:05:53,554 [root] - ReadTask-8 started with pid 5492
-2022-07-13 10:06:00,842 [DataBaseMonitor] - count=6612939 speed=661293.9
-2022-07-13 10:06:11,151 [DataBaseMonitor] - count=14765739 speed=815280.0
-2022-07-13 10:06:21,677 [DataBaseMonitor] - count=23282163 speed=851642.4
-2022-07-13 10:06:31,985 [DataBaseMonitor] - count=31673139 speed=839097.6
-2022-07-13 10:06:42,343 [DataBaseMonitor] - count=39819439 speed=814630.0
-2022-07-13 10:06:52,830 [DataBaseMonitor] - count=48146339 speed=832690.0
-2022-07-13 10:07:03,396 [DataBaseMonitor] - count=56385039 speed=823870.0
-2022-07-13 10:07:14,341 [DataBaseMonitor] - count=64848739 speed=846370.0
-2022-07-13 10:07:24,877 [DataBaseMonitor] - count=73654566 speed=880582.7
-```
+ ```
+ root@vm85$ python3 fast_write_example.py 8 8
+ 2022-07-14 19:13:45,869 [root] - READ_TASK_COUNT=8, WRITE_TASK_COUNT=8, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000
+ 2022-07-14 19:13:48,882 [root] - WriteTask-0 started with pid 718347
+ 2022-07-14 19:13:48,883 [root] - WriteTask-1 started with pid 718348
+ 2022-07-14 19:13:48,884 [root] - WriteTask-2 started with pid 718349
+ 2022-07-14 19:13:48,884 [root] - WriteTask-3 started with pid 718350
+ 2022-07-14 19:13:48,885 [root] - WriteTask-4 started with pid 718351
+ 2022-07-14 19:13:48,885 [root] - WriteTask-5 started with pid 718352
+ 2022-07-14 19:13:48,886 [root] - WriteTask-6 started with pid 718353
+ 2022-07-14 19:13:48,886 [root] - WriteTask-7 started with pid 718354
+ 2022-07-14 19:13:48,887 [root] - ReadTask-0 started with pid 718355
+ 2022-07-14 19:13:48,888 [root] - ReadTask-1 started with pid 718356
+ 2022-07-14 19:13:48,889 [root] - ReadTask-2 started with pid 718357
+ 2022-07-14 19:13:48,889 [root] - ReadTask-3 started with pid 718358
+ 2022-07-14 19:13:48,890 [root] - ReadTask-4 started with pid 718359
+ 2022-07-14 19:13:48,891 [root] - ReadTask-5 started with pid 718361
+ 2022-07-14 19:13:48,892 [root] - ReadTask-6 started with pid 718364
+ 2022-07-14 19:13:48,893 [root] - ReadTask-7 started with pid 718365
+ 2022-07-14 19:13:56,042 [DataBaseMonitor] - count=6676310 speed=667631.0
+ 2022-07-14 19:14:06,196 [DataBaseMonitor] - count=20004310 speed=1332800.0
+ 2022-07-14 19:14:16,366 [DataBaseMonitor] - count=32290310 speed=1228600.0
+ 2022-07-14 19:14:26,527 [DataBaseMonitor] - count=44438310 speed=1214800.0
+ 2022-07-14 19:14:36,673 [DataBaseMonitor] - count=56608310 speed=1217000.0
+ 2022-07-14 19:14:46,834 [DataBaseMonitor] - count=68757310 speed=1214900.0
+ 2022-07-14 19:14:57,280 [DataBaseMonitor] - count=80992310 speed=1223500.0
+ 2022-07-14 19:15:07,689 [DataBaseMonitor] - count=93805310 speed=1281300.0
+ 2022-07-14 19:15:18,020 [DataBaseMonitor] - count=106111310 speed=1230600.0
+ 2022-07-14 19:15:28,356 [DataBaseMonitor] - count=118394310 speed=1228300.0
+ 2022-07-14 19:15:38,690 [DataBaseMonitor] - count=130742310 speed=1234800.0
+ 2022-07-14 19:15:49,000 [DataBaseMonitor] - count=143051310 speed=1230900.0
+ 2022-07-14 19:15:59,323 [DataBaseMonitor] - count=155276310 speed=1222500.0
+ 2022-07-14 19:16:09,649 [DataBaseMonitor] - count=167603310 speed=1232700.0
+ 2022-07-14 19:16:19,995 [DataBaseMonitor] - count=179976310 speed=1237300.0
+ ```
+
+
+
+
+
diff --git a/docs/zh/07-develop/03-insert-data/highvolume.webp b/docs/zh/07-develop/03-insert-data/highvolume.webp
index 9f8c9a9cc161e382728750bdfcc93ee086d62aea..46dfc74ae3b0043c591ff930c62251da49cae7ad 100644
Binary files a/docs/zh/07-develop/03-insert-data/highvolume.webp and b/docs/zh/07-develop/03-insert-data/highvolume.webp differ