提交 7fbf8cbf 编写于 作者: D dingbo

docs: add high-volume.md

上级 7ec8a441
...@@ -24,6 +24,16 @@ ...@@ -24,6 +24,16 @@
<version>2.0.38</version> <version>2.0.38</version>
</dependency> </dependency>
<!-- ANCHOR_END: dep--> <!-- ANCHOR_END: dep-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
...@@ -31,5 +41,36 @@ ...@@ -31,5 +41,36 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.5</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public Long count() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
result.next();
return result.getLong(1);
}
return null;
}
}
// ANCHOR: main
public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static int taskQueueCapacity = 10000000;
final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>();
final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();
public static void stopAll() {
logger.info("shutting down");
readTasks.forEach(task -> task.stop());
writeTasks.forEach(task -> task.stop());
databaseMonitor.close();
}
public static void main(String[] args) throws InterruptedException, SQLException {
int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 3;
int tableCount = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int maxBatchSize = args.length > 3 ? Integer.parseInt(args[3]) : 3000;
logger.info("readTaskCount={}, writeTaskCount={} tableCount={} maxBatchSize={}",
readTaskCount, writeTaskCount, tableCount, maxBatchSize);
databaseMonitor.init().prepareDatabase();
// Create task queues, whiting tasks and start writing threads.
for (int i = 0; i < writeTaskCount; ++i) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity);
taskQueues.add(queue);
WriteTask task = new WriteTask(queue, maxBatchSize);
Thread t = new Thread(task);
t.setName("WriteThread-" + i);
t.start();
}
// create reading tasks and start reading threads
int tableCountPerTask = tableCount / readTaskCount;
for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(i, taskQueues, tableCountPerTask);
Thread t = new Thread(task);
t.setName("ReadThread-" + i);
t.start();
}
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));
long lastCount = 0;
while (true) {
Thread.sleep(10000);
long count = databaseMonitor.count();
logger.info("count={} speed={}", count, (count - lastCount) / 10);
lastCount = count;
}
}
}
// ANCHOR_END: main
\ No newline at end of file
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
/**
* Generate test data
*/
class MockDataSource implements Iterator {
private String tbNamePrefix;
private int tableCount;
private long maxRowsPerTable = 1000000000L;
// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
// mock values
String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix, int tableCount) {
this.tbNamePrefix = tbNamePrefix;
this.tableCount = tableCount;
}
@Override
public boolean hasNext() {
currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
}
@Override
public String next() {
long ts = startMs + 100 * currentRow;
int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(location[currentRow % 5]).append(','); // location
sb.append(groupId); // groupID
return sb.toString();
}
}
// ANCHOR: ReadTask
class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId;
private final List<BlockingQueue<String>> taskQueues;
private final int queueCount;
private final int tableCount;
private boolean active = true;
public ReadTask(int readTaskId, List<BlockingQueue<String>> queues, int tableCount) {
this.taskId = readTaskId;
this.taskQueues = queues;
this.queueCount = queues.size();
this.tableCount = tableCount;
}
/**
* Assign data received to different queues.
* Here we use the suffix number in table name.
* You are expected to define your own rule in practice.
*
* @param line record received
* @return which queue to use
*/
public int getQueueId(String line) {
String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101
String suffixNumber = tbName.split("_")[1];
return Integer.parseInt(suffixNumber) % this.queueCount;
}
@Override
public void run() {
logger.info("started");
Iterator<String> it = new MockDataSource("tb" + this.taskId, tableCount);
try {
while (it.hasNext() && active) {
String line = it.next();
int queueId = getQueueId(line);
taskQueues.get(queueId).put(line);
}
} catch (Exception e) {
logger.error("Read Task Error", e);
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}
// ANCHOR_END: ReadTask
\ No newline at end of file
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
// ANCHOR: SQLWriter
/**
* A helper class encapsulate the logic of writing using SQL.
* <p>
* The main interfaces are two methods:
* <ol>
* <li>{@link SQLWriter#processLine}, which receive raw lines from WriteTask and group them by table names.</li>
* <li>{@link SQLWriter#flush}, which assemble INSERT statement and execute it.</li>
* </ol>
* <p>
* There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb".
* This ensure that checking table existence is a one-time-only operation.
* </p>
*
* </p>
*/
public class SQLWriter {
final static Logger logger = LoggerFactory.getLogger(SQLWriter.class);
private Connection conn;
private Statement stmt;
/**
* current number of buffered records
*/
private int bufferedCount = 0;
/**
* Maximum number of buffered records.
* Flush action will be triggered if bufferedCount reached this value,
*/
private int maxBatchSize;
/**
* Maximum SQL length.
*/
private int maxSQLLength;
/**
* Map from table name to column values. For example:
* "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)"
*/
private Map<String, String> tbValues = new HashMap<>();
/**
* Map from table name to tag values in the same order as creating stable.
* Used for creating table.
*/
private Map<String, String> tbTags = new HashMap<>();
public SQLWriter(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
/**
* Get Database Connection
*
* @return Connection
* @throws SQLException
*/
private static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
return DriverManager.getConnection(jdbcURL);
}
/**
* Create Connection and Statement
*
* @throws SQLException
*/
public void init() throws SQLException {
conn = getConnection();
stmt = conn.createStatement();
stmt.execute("use test");
ResultSet rs = stmt.executeQuery("show variables");
while (rs.next()) {
String configName = rs.getString(1);
if ("maxSQLLength".equals(configName)) {
maxSQLLength = Integer.parseInt(rs.getString(2));
logger.info("maxSQLLength={}", maxSQLLength);
}
}
}
/**
* Convert raw data to SQL fragments, group them by table name and cache them in a HashMap.
* Trigger writing when number of buffered records reached maxBachSize.
*
* @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId
*/
public void processLine(String line) throws SQLException {
bufferedCount += 1;
int firstComma = line.indexOf(',');
String tbName = line.substring(0, firstComma);
int lastComma = line.lastIndexOf(',');
int secondLastComma = line.lastIndexOf(',', lastComma - 1);
String value = "(" + line.substring(firstComma + 1, secondLastComma) + ") ";
if (tbValues.containsKey(tbName)) {
tbValues.put(tbName, tbValues.get(tbName) + value);
} else {
tbValues.put(tbName, value);
}
if (!tbTags.containsKey(tbName)) {
String location = line.substring(secondLastComma + 1, lastComma);
String groupId = line.substring(lastComma + 1);
String tagValues = "('" + location + "'," + groupId + ')';
tbTags.put(tbName, tagValues);
}
if (bufferedCount == maxBatchSize) {
flush();
}
}
/**
* Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it.
* In case of "Table does not exit" exception, create all tables in the sql and retry the sql.
*/
public void flush() throws SQLException {
StringBuilder sb = new StringBuilder("INSERT INTO ");
for (Map.Entry<String, String> entry : tbValues.entrySet()) {
String tableName = entry.getKey();
String values = entry.getValue();
String q = tableName + " values " + values + " ";
if (sb.length() + q.length() > maxSQLLength) {
executeSQL(sb.toString());
logger.warn("increase maxSQLLength or decrease maxBatchSize to gain better performance");
sb = new StringBuilder("INSERT INTO ");
}
sb.append(q);
}
executeSQL(sb.toString());
tbValues.clear();
bufferedCount = 0;
}
private void executeSQL(String sql) throws SQLException {
try {
stmt.executeUpdate(sql);
} catch (SQLException e) {
// convert to error code defined in taoserror.h
int errorCode = e.getErrorCode() & 0xffff;
if (errorCode == 0x362 || errorCode == 0x218) {
// Table does not exist
createTables();
stmt.executeUpdate(sql);
} else {
throw e;
}
}
}
/**
* Create tables in batch using syntax:
* <p>
* CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
* </p>
*/
private void createTables() throws SQLException {
StringBuilder sb = new StringBuilder("CREATE TABLE ");
for (String tbName : tbValues.keySet()) {
String tagValues = tbTags.get(tbName);
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
}
String sql = sb.toString();
stmt.executeUpdate(sql);
}
public boolean hasBufferedValues() {
return bufferedCount > 0;
}
public int getBufferedCount() {
return bufferedCount;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
}
// ANCHOR_END: SQLWriter
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
// ANCHOR: WriteTask
class WriteTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final int maxBatchSize;
// the queue from which this writing task get raw data.
private final BlockingQueue<String> queue;
// A flag indicate whether to continue.
private boolean active = true;
public WriteTask(BlockingQueue<String> taskQueue, int maxBatchSize) {
this.queue = taskQueue;
this.maxBatchSize = maxBatchSize;
}
@Override
public void run() {
logger.info("started");
String line = null; // data getting from the queue just now.
SQLWriter writer = new SQLWriter(maxBatchSize);
try {
writer.init();
while (active) {
line = queue.poll();
if (line != null) {
// parse raw data and buffer the data.
writer.processLine(line);
} else if (writer.hasBufferedValues()) {
// write data immediately if no more data in the queue
writer.flush();
} else {
// sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, .
Thread.sleep(100);
}
}
if (writer.hasBufferedValues()) {
writer.flush();
}
} catch (Exception e) {
String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount());
logger.error(msg, e);
} finally {
writer.close();
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}
// ANCHOR_END: WriteTask
\ No newline at end of file
# install dependencies:
# recommend python >= 3.8
# pip3 install faster-fifo
#
import logging
import sys
import time
import os
from multiprocessing import Process
from faster_fifo import Queue
from queue import Empty
from typing import List
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
READ_TASK_COUNT = 1
WRITE_TASK_COUNT = 1
TABLE_COUNT = 1000
QUEUE_SIZE = 1000000
MAX_BATCH_SIZE = 3000
read_processes = []
write_processes = []
def get_connection():
"""
If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
"""
import taos
firstEP = os.environ.get("TDENGINE_FIRST_EP")
if firstEP:
host, port = firstEP.split(":")
else:
host, port = None, 0
user = os.environ.get("TDENGINE_USER", "root")
password = os.environ.get("TDENGINE_PASSWORD", "taosdata")
return taos.connect(host=host, port=int(port), user=user, password=password)
# ANCHOR: MockDataSource
class MockDataSource:
location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"]
current = [8.8, 10.7, 9.9, 8.9, 9.4]
voltage = [119, 116, 111, 113, 118]
phase = [0.32, 0.34, 0.33, 0.329, 0.141]
max_rows_per_table = 10 ** 9
def __init__(self, tb_name_prefix, table_count):
self.table_name_prefix = tb_name_prefix
self.table_count = table_count
self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100
def __iter__(self):
self.row = 0
self.table_id = -1
return self
def __next__(self):
"""
next 100 rows of current table
"""
self.table_id += 1
if self.table_id == self.table_count:
self.table_id = 0
if self.row >= self.max_rows_per_table:
raise StopIteration
rows = []
while len(rows) < 100:
self.row += 1
ts = self.start_ms + 100 * self.row
group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1
tb_name = self.table_name_prefix + '_' + str(self.table_id)
ri = self.row % 5
rows.append(f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}")
return self.table_id, rows
# ANCHOR_END: MockDataSource
# ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
try:
for table_id, rows in data_source:
# hash data to different queue
i = table_id % len(task_queues)
# block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1)
except KeyboardInterrupt:
pass
# ANCHOR_END: read
# ANCHOR: write
def run_write_task(task_id: int, queue: Queue):
from sql_writer import SQLWriter
log = logging.getLogger(f"WriteTask-{task_id}")
writer = SQLWriter(get_connection)
lines = None
try:
while True:
try:
# get as many as possible
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
writer.process_lines(lines)
except Empty:
time.sleep(0.01)
except KeyboardInterrupt:
pass
except BaseException as e:
log.debug(f"lines={lines}")
raise e
# ANCHOR_END: write
def set_global_config():
argc = len(sys.argv)
if argc > 1:
global READ_TASK_COUNT
READ_TASK_COUNT = int(sys.argv[1])
if argc > 2:
global WRITE_TASK_COUNT
WRITE_TASK_COUNT = int(sys.argv[2])
if argc > 3:
global TABLE_COUNT
TABLE_COUNT = int(sys.argv[3])
if argc > 4:
global QUEUE_SIZE
QUEUE_SIZE = int(sys.argv[4])
if argc > 5:
global MAX_BATCH_SIZE
MAX_BATCH_SIZE = int(sys.argv[5])
# ANCHOR: monitor
def run_monitor_process():
import taos
log = logging.getLogger("DataBaseMonitor")
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
def get_count():
res = conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all()
return rows[0][0] if rows else 0
last_count = 0
while True:
time.sleep(10)
count = get_count()
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
# ANCHOR_END: monitor
# ANCHOR: main
def main():
set_global_config()
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
monitor_process = Process(target=run_monitor_process)
monitor_process.start()
time.sleep(3) # waiting for database ready.
task_queues: List[Queue] = []
# create task queues
for i in range(WRITE_TASK_COUNT):
queue = Queue(max_size_bytes=QUEUE_SIZE)
task_queues.append(queue)
# create write processes
for i in range(WRITE_TASK_COUNT):
p = Process(target=run_write_task, args=(i, task_queues[i]))
p.start()
logging.debug(f"WriteTask-{i} started with pid {p.pid}")
write_processes.append(p)
# create read processes
for i in range(READ_TASK_COUNT):
p = Process(target=run_read_task, args=(i, task_queues))
p.start()
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes.append(p)
try:
monitor_process.join()
except KeyboardInterrupt:
monitor_process.terminate()
[p.terminate() for p in read_processes]
[p.terminate() for p in write_processes]
[q.close() for q in task_queues]
if __name__ == '__main__':
main()
# ANCHOR_END: main
import logging
import taos
class SQLWriter:
log = logging.getLogger("SQLWriter")
def __init__(self, get_connection_func):
self._tb_values = {}
self._tb_tags = {}
self._conn = get_connection_func()
self._max_sql_length = self.get_max_sql_length()
self._conn.execute("USE test")
def get_max_sql_length(self):
rows = self._conn.query("SHOW variables").fetch_all()
for r in rows:
name = r[0]
if name == "maxSQLLength":
return int(r[1])
def process_lines(self, lines: str):
"""
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
"""
for line in lines:
ps = line.split(",")
table_name = ps[0]
value = '(' + ",".join(ps[1:-2]) + ') '
if table_name in self._tb_values:
self._tb_values[table_name] += value
else:
self._tb_values[table_name] = value
if table_name not in self._tb_tags:
location = ps[-2]
group_id = ps[-1]
tag_value = f"('{location}',{group_id})"
self._tb_tags[table_name] = tag_value
self.flush()
def flush(self):
"""
Assemble INSERT statement and execute it.
When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created.
In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed.
"""
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
for tb_name, values in self._tb_values.items():
q = tb_name + " VALUES " + values
if sql_len + len(q) >= self._max_sql_length:
sql += " ".join(buf)
self.execute_sql(sql)
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
buf.append(q)
sql_len += len(q)
sql += " ".join(buf)
self.execute_sql(sql)
self._tb_values.clear()
def execute_sql(self, sql):
try:
self._conn.execute(sql)
except taos.Error as e:
error_code = e.errno & 0xffff
# Table does not exit
if error_code == 0x362 or error_code == 0x218:
self.create_tables()
else:
raise e
def create_tables(self):
sql = "CREATE TABLE "
for tb in self._tb_values.keys():
tag_values = self._tb_tags[tb]
sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
self._conn.execute(sql)
---
title: 高效写入
---
## 高效写入原理 {#principle}
本节介绍如何高效地向 TDengine 写入数据。高效写入数据要考虑几个因素:数据在不同表(或子表)之间的分布,即要写入数据的相邻性;单次写入的数据量;并发连接数。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;每批次写入的数据量越大越高效(但超过一定阈值其优势会消失;同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。
为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。
为了使写入最高效,除了客户端程序的设计,服务端的配置也很重要。如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库有可能用到 10 个 vgroup。更多性能调优参数请参考[配置参考](../../reference/config)性能调优部分。
## 高效写入方案 {#scenario}
下面的示例程序展示了如何高效写入数据:
- TDengine 客户端程序从消息队列或者其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源
- 单个连接向 TDengine 写入的速度无法与读数据的速度相匹配,因此客户端程序启动多个线程,每个线程都建立了与 TDengine 的连接,每个线程都有一个独占的固定大小的消息队列
- 客户端程序将接收到的数据根据所属的表名(或子表名)HASH 到不同的线程,即写入该线程所对应的消息队列,以此确保属于某个表(或子表)的数据一定会被一个固定的线程处理
- 各个子线程在将所关联的消息队列中的数据读空后或者读取数据量达到一个预定的阈值后将该批数据写入 TDengine,并继续处理后面接收到的数据
![TDengine 高效写入线程模型](highvolume.webp)
:::note
上图所示架构,每个写任务只负责写特定的表,体现了数据的相邻性原则。但是读任务所读的表,我们假设是随机的。这样一个队列有多个写入线程(或进程),队列内部可能产生锁的消耗。实际场景,如果能做到一个读任务对应一个写任务是最好的。
:::
## Java 示例程序 {#java-demo}
在 Java 示例程序中采用拼接 SQL 的写入方式。
### 主程序 {#java-demo-main}
主程序负责:
1. 创建消息队列
2. 启动写线程
3. 启动读线程
4. 每隔 10 秒统计一次写入速度
主程序默认暴露了 4 个参数,每次启动程序都可调节,用于测试和调优:
1. 读线程个数。默认为 1。
2. 写线程个数。默认为 3。
3. 模拟生成的总表数。默认为 1000。将会平分给各个读线程。
4. 每批最多写入记录数量。默认为 3000。
<details>
<summary>主程序</summary>
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}}
```
</details>
队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。
### 读任务的实现 {#java-demo-read}
读任务负责从数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成一点数量表的数据。不同的模拟数据源生成不同表的数据。
读任务采用阻塞的方式写消息队列。也就是说,一旦队列满了,写操作就会阻塞。
<details>
<summary>读任务的实现</summary>
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}}
```
</details>
### 写任务的实现 {#java-demo-write}
<details>
<summary>写任务的实现</summary>
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}}
```
</details>
### SQLWriter 类的实现 {#java-demo-sql-writer}
SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。
<details>
<summary>SQLWriter 类的实现</summary>
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java:SQLWriter}}
```
</details>
### 执行示例程序 {#run-java-demo}
<details>
<summary>执行 Java 示例程序</summary>
执行程序前需配置环境变量 `TDENGINE_JDBC_URL`。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置:
```
TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
#### 本地集成开发环境执行示例程序 {#java-demo-local-run}
1. clone TDengine 仓库
```
git clone git@github.com:taosdata/TDengine.git --depth 1
```
2. 用集成开发环境打开 `docs/examples/java` 目录。
3. 在开发环境中配置环境变量 `TDENGINE_JDBC_URL`。如果已配置了全局的环境变量 `TDENGINE_JDBC_URL` 可跳过这一步。
4. 运行类 `com.taos.example.highvolume.FastWriteExample`
#### 远程服务器上执行示例程序 {#java-demo-remote-run}
若要在服务器上执行示例程序,可按照下面的步骤操作:
1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行:
```
mvn package
```
2. 远程服务器上创建 examples 目录:
```
mkdir -p examples/java
```
3. 复制依赖到服务器指定目录:
- 复制依赖包,只用复制一次
```
scp -r .\target\lib <user>@<host>:~/examples/java
```
- 复制本程序的 jar 包,每次更新代码都需要复制
```
scp -r .\target\javaexample-1.0.jar <user>@<host>:~/examples/java
```
4. 配置环境变量。
编辑 `~/.bash_profile``~/.bashrc` 添加如下内容例如:
```
export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。
5. 用 java 命令启动示例程序,命令模板:
```
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample <read_thread_count> <white_thread_count> <total_table_count> <max_batch_size>
```
6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 <kbd>CTRL</kbd> + <kbd>C</kbd> 结束程序。
下面是一次实际运行的截图:
```
[testuser@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000
17:01:01.131 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=1, writeTaskCount=9 tableCount=1000 maxBatchSize=2000
17:01:01.286 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.354 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.360 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.366 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.433 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.438 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.443 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.448 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.454 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.454 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
17:01:11.615 [main] INFO c.t.e.highvolume.FastWriteExample - count=18766442 speed=1876644
17:01:21.775 [main] INFO c.t.e.highvolume.FastWriteExample - count=38947464 speed=2018102
17:01:32.428 [main] INFO c.t.e.highvolume.FastWriteExample - count=58649571 speed=1970210
17:01:42.577 [main] INFO c.t.e.highvolume.FastWriteExample - count=79264890 speed=2061531
17:01:53.265 [main] INFO c.t.e.highvolume.FastWriteExample - count=99097476 speed=1983258
17:02:04.209 [main] INFO c.t.e.highvolume.FastWriteExample - count=119546779 speed=2044930
17:02:14.935 [main] INFO c.t.e.highvolume.FastWriteExample - count=141078914 speed=2153213
17:02:25.617 [main] INFO c.t.e.highvolume.FastWriteExample - count=162183457 speed=2110454
17:02:36.718 [main] INFO c.t.e.highvolume.FastWriteExample - count=182735614 speed=2055215
17:02:46.988 [main] INFO c.t.e.highvolume.FastWriteExample - count=202895614 speed=2016000
```
</details>
## Python 示例程序 {#python-demo}
该 Python 示例程序中采用了多进程的架构,并使用了跨进程的队列通信。写任务采用拼装 SQL 的方式写入。
### main 函数 {#python-demo-main}
main 函数负责创建消息队列和启动子进程,子进程有 3 类:
1. 1 个监控进程,负责数据库初始化和统计写入速度
2. n 个读进程,负责从其它数据系统读数据
3. m 个写进程,负责写数据库
main 函数可以接收 5 个启动参数,依次是:
1. 读任务(进程)数, 默认为 1
2. 写任务(进程)数, 默认为 1
3. 模拟生成的总表数,默认为 1000
4. 队列大小(单位字节),默认为 1000000
5. 每批最多写入记录数量, 默认为 3000
<details>
<summary>main 函数</summary>
```python
{{#include docs/examples/python/highvolume_faster_queue.py:main}}
```
</details>
### 监控进程
监控进程负责初始化数据库,并监控当前的写入速度。
<details>
<summary>Monitor Process</summary>
```python
{{#include docs/examples/python/highvolume_faster_queue.py:monitor}}
```
</details>
### 读进程 {#python-read-process}
#### 读进程主要逻辑 {#python-run-read-task}
读进程,负责从其它数据系统读数据,并分发数据到各个写进程。
<details>
<summary>run_read_task 函数</summary>
```python
{{#include docs/examples/python/highvolume_faster_queue.py:read}}
```
</details>
#### 模拟数据源 {#python-mock-data-source}
以下是模拟数据源的实现,我们假设数据源生成的每一条数据都带有目标表名信息。实际中你可能需要一定的规则确定目标表名。
<details>
<summary>MockDataSource</summary>
```python
{{#include docs/examples/python/highvolume_faster_queue.py:MockDataSource}}
```
</details>
### 写进程 {#python-write-process}
写进程每次从队列中取出尽量多的数据,并批量写入。
<details>
<summary>run_write_task 函数</summary>
```python
{{#include docs/examples/python/highvolume_faster_queue.py:write}}
```
</details>
### SQLWriter 类的实现 {#python-sql-writer}
SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。这个类也对 SQL 是否超过最大长度限制做了检查,如果接近 SQL 最大长度限制(maxSQLLength),将会立即执行 SQL。为了减少 SQL 此时,建议将 maxSQLLength 适当调大。
<details>
<summary>SQLWriter</summary>
```python
{{#include docs/examples/python/sql_writer.py}}
```
</details>
### 执行示例程序 {#run-python-demo}
<details>
<summary>执行 Python 示例程序</summary>
1. 前提条件
- 已安装 TDengine 客户端驱动
- 已安装 Python3, 推荐版本 >= 3.8
- 已安装 taospy
2. 安装 faster-fifo 代替 python 内置的 multiprocessing.Queue
```
pip3 install faster-fifo
```
3. 点击上面的“查看源码”链接复制 `highvolume_faster_queue.py``sql_writer.py` 两个文件。
4. 执行示例程序
```
python3 highvolume_faster_queue.py <READ_TASK_COUNT> <WRITE_TASK_COUNT> <TABLE_COUNT> <QUEUE_SIZE> <MAX_BATCH_SIZE>
```
下面是一次实际运行的输出:
```
[testuser@vm95 python]$ python3.6 highvolume_faster_queue.py 9 9 1000 5000000 3000
2022-07-13 10:05:50,504 [root] - READ_TASK_COUNT=9, WRITE_TASK_COUNT=9, TABLE_COUNT=1000, QUEUE_SIZE=5000000, MAX_BATCH_SIZE=3000
2022-07-13 10:05:53,542 [root] - WriteTask-0 started with pid 5475
2022-07-13 10:05:53,542 [root] - WriteTask-1 started with pid 5476
2022-07-13 10:05:53,543 [root] - WriteTask-2 started with pid 5477
2022-07-13 10:05:53,543 [root] - WriteTask-3 started with pid 5478
2022-07-13 10:05:53,544 [root] - WriteTask-4 started with pid 5479
2022-07-13 10:05:53,544 [root] - WriteTask-5 started with pid 5480
2022-07-13 10:05:53,545 [root] - WriteTask-6 started with pid 5481
2022-07-13 10:05:53,546 [root] - WriteTask-7 started with pid 5482
2022-07-13 10:05:53,546 [root] - WriteTask-8 started with pid 5483
2022-07-13 10:05:53,547 [root] - ReadTask-0 started with pid 5484
2022-07-13 10:05:53,548 [root] - ReadTask-1 started with pid 5485
2022-07-13 10:05:53,549 [root] - ReadTask-2 started with pid 5486
2022-07-13 10:05:53,550 [root] - ReadTask-3 started with pid 5487
2022-07-13 10:05:53,551 [root] - ReadTask-4 started with pid 5488
2022-07-13 10:05:53,552 [root] - ReadTask-5 started with pid 5489
2022-07-13 10:05:53,552 [root] - ReadTask-6 started with pid 5490
2022-07-13 10:05:53,553 [root] - ReadTask-7 started with pid 5491
2022-07-13 10:05:53,554 [root] - ReadTask-8 started with pid 5492
2022-07-13 10:06:00,842 [DataBaseMonitor] - count=6612939 speed=661293.9
2022-07-13 10:06:11,151 [DataBaseMonitor] - count=14765739 speed=815280.0
2022-07-13 10:06:21,677 [DataBaseMonitor] - count=23282163 speed=851642.4
2022-07-13 10:06:31,985 [DataBaseMonitor] - count=31673139 speed=839097.6
2022-07-13 10:06:42,343 [DataBaseMonitor] - count=39819439 speed=814630.0
2022-07-13 10:06:52,830 [DataBaseMonitor] - count=48146339 speed=832690.0
2022-07-13 10:07:03,396 [DataBaseMonitor] - count=56385039 speed=823870.0
2022-07-13 10:07:14,341 [DataBaseMonitor] - count=64848739 speed=846370.0
2022-07-13 10:07:24,877 [DataBaseMonitor] - count=73654566 speed=880582.7
```
</details>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册