未验证 提交 004c6073 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #14368 from taosdata/docs/TD-16838-1

docs: java high volume writing example
.vscode .vscode
*.lock *.lock
.env
*.~
.idea .idea
.env
\ No newline at end of file
...@@ -24,6 +24,16 @@ ...@@ -24,6 +24,16 @@
<version>2.0.38</version> <version>2.0.38</version>
</dependency> </dependency>
<!-- ANCHOR_END: dep--> <!-- ANCHOR_END: dep-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
...@@ -31,5 +41,36 @@ ...@@ -31,5 +41,36 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.5</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>
package com.taos.example;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class TestTableNotExits {
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}
public static void main(String[] args) throws SQLException {
try(Connection conn = getConnection()) {
try(Statement stmt = conn.createStatement()) {
try {
stmt.executeUpdate("insert into test.t1 values(1, 2) test.t2 values(3, 4)");
} catch (SQLException e) {
System.out.println(e.getErrorCode());
System.out.println(Integer.toHexString(e.getErrorCode()));
System.out.println(e);
}
}
}
}
}
package com.taos.example.highvolume; package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public Long count() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
result.next();
return result.getLong(1);
}
return null;
}
}
// ANCHOR: main // ANCHOR: main
public class FastWriteExample { public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static int readTaskCount = 1; final static int taskQueueCapacity = Integer.MAX_VALUE / 100;
final static int writeTaskCount = 4; final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
final static List<ReadTask> readTasks = new ArrayList<>(); final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>(); final static List<WriteTask> writeTasks = new ArrayList<>();
final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();
public static void stopAll() {
logger.info("shutting down");
readTasks.forEach(task -> task.stop());
writeTasks.forEach(task -> task.stop());
databaseMonitor.close();
}
public static void main(String[] args) throws InterruptedException, SQLException {
int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 1;
logger.info("readTaskCount={}, writeTaskCount={}", readTaskCount, writeTaskCount);
databaseMonitor.init().prepareDatabase();
public static void main(String[] args) throws InterruptedException { // Create task queues, whiting tasks and start writing threads.
// Create writing tasks.
// Every writing task contains a work thread and a task queue.
for (int i = 0; i < writeTaskCount; ++i) { for (int i = 0; i < writeTaskCount; ++i) {
WriteTask task = new WriteTask(); BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity);
task.start(); taskQueues.add(queue);
writeTasks.add(new WriteTask()); WriteTask task = new WriteTask(queue);
Thread t = new Thread(task);
t.setName("WriteThread-" + i);
t.start();
} }
// Create reading tasks. // create reading tasks and start reading threads
// Every reading task contains a work thread and a reference to each writing task.
for (int i = 0; i < readTaskCount; ++i) { for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(i, writeTasks); ReadTask task = new ReadTask(i, taskQueues);
task.start(); Thread t = new Thread(task);
readTasks.add(task); t.setName("ReadThread-" + i);
t.start();
} }
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));
long lastCount = 0;
while (true) { while (true) {
Thread.sleep(1000); Thread.sleep(10000);
long count = databaseMonitor.count();
logger.info("total_count={} speed={}", count, (count - lastCount) / 10);
lastCount = count;
} }
} }
} }
// ANCHOR_END: main // ANCHOR_END: main
\ No newline at end of file
package com.taos.example.highvolume; package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue;
/**
class FakeDataSource implements Iterator { * Generate test data
private long now = System.currentTimeMillis(); */
private int tableCount = 1000; class MockDataSource implements Iterator {
private int numberRows = 10 ^ 7;
private int tsStep = 100; // 100 milliseconds
private String tbNamePrefix; private String tbNamePrefix;
private long ts = now - numberRows * tsStep; private int tableCount = 1000;
private int tbId = 0; private long maxRowsPerTable = 1000000000L;
public FakeDataSource(String tbNamePrefix) { // 100 milliseconds between two neighbouring rows.
this.tbNamePrefix = tbNamePrefix; long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
// mock values
String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix) {
this.tbNamePrefix = tbNamePrefix;
} }
@Override @Override
public boolean hasNext() { public boolean hasNext() {
return ts < now; currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
} }
@Override @Override
public String next() { public String next() {
if (tbId < tableCount) { long ts = startMs + 100 * currentRow;
tbId += 1; int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
} else { StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
ts += tsStep;
tbId = 0;
}
StringBuilder sb = new StringBuilder(tbNamePrefix + tbId + ","); // tbName
sb.append(ts).append(','); // ts sb.append(ts).append(','); // ts
sb.append(1.0).append(','); // current sb.append(current[currentRow % 5]).append(','); // current
sb.append(110).append(','); // voltage sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(0.32).append(','); // phase sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(3).append(','); // groupID sb.append(location[currentRow % 5]).append(','); // location
sb.append("Los Angeles"); // location sb.append(groupId); // groupID
return sb.toString(); return sb.toString();
} }
} }
// ANCHOR: ReadTask // ANCHOR: ReadTask
class ReadTask { class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId; private final int taskId;
private Thread readThread; private final List<BlockingQueue<String>> taskQueues;
private List<WriteTask> writeTasks; private final int queueCount;
private boolean active = true;
public ReadTask(int readTaskId, List<WriteTask> writeTasks) { public ReadTask(int readTaskId, List<BlockingQueue<String>> queues) {
this.taskId = readTaskId; this.taskId = readTaskId;
this.writeTasks = writeTasks; this.taskQueues = queues;
this.readThread = new Thread(this::doReadTask); this.queueCount = queues.size();
} }
/** /**
* Read lines from datasource. * Assign data received to different queues.
* And assign each line to a writing task according to the table name. * Here we use the suffix number in table name.
* You are expected to define your own rule in practice.
*
* @param line record received
* @return which queue to use
*/ */
private void doReadTask() { public int getQueueId(String line) {
int numberWriteTask = writeTasks.size(); String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101
Iterator<String> it = new FakeDataSource("t" + this.taskId + "tb"); String suffixNumber = tbName.split("_")[1];
return Integer.parseInt(suffixNumber) % this.queueCount;
}
@Override
public void run() {
logger.info("started");
Iterator<String> it = new MockDataSource("tb" + this.taskId);
try { try {
while (it.hasNext()) { while (it.hasNext() && active) {
String line = it.next(); String line = it.next();
String tbName = line.substring(0, line.indexOf(',')); int queueId = getQueueId(line);
int writeTaskId = Math.abs(tbName.hashCode()) % numberWriteTask; taskQueues.get(queueId).put(line);
writeTasks.get(writeTaskId).put(line);
} }
} catch (InterruptedException e) { } catch (Exception e) {
e.printStackTrace(); logger.error("Read Task Error", e);
} }
} }
/** public void stop() {
* Start reading thread. logger.info("stop");
*/ this.active = false;
public void start() {
this.readThread.start();
} }
} }
......
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
// ANCHOR: SQLWriter
/**
* A helper class encapsulate the logic of writing using SQL.
* <p>
* The main interfaces are two methods:
* <ol>
* <li>{@link SQLWriter#processLine}, which receive raw data from WriteTask and group raw data by table names.</li>
* <li>{@link SQLWriter#flush}, which assemble INSERT statement and execute it.</li>
* </ol>
* <p>
* There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb".
* This ensure that checking table existence is a one-time-only operation.
* </p>
*
* </p>
*/
public class SQLWriter {
final static Logger logger = LoggerFactory.getLogger(SQLWriter.class);
private Connection conn;
private Statement stmt;
/**
* current number of buffered records
*/
private int bufferedCount = 0;
/**
* Maximum number of buffered records.
* Flush action will be triggered if bufferedCount reached this value,
*/
private int maxBatchSize;
/**
* Map from table name to column values. For example:
* "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)"
*/
private Map<String, String> tbValues = new HashMap<>();
/**
* Map from table name to tag values in the same order as creating stable.
* Used for creating table.
*/
private Map<String, String> tbTags = new HashMap<>();
public SQLWriter(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
/**
* Get Database Connection
*
* @return Connection
* @throws SQLException
*/
private static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
return DriverManager.getConnection(jdbcURL);
}
/**
* Create Connection and Statement
*
* @throws SQLException
*/
public void init() throws SQLException {
conn = getConnection();
stmt = conn.createStatement();
stmt.execute("use test");
}
/**
* Convert raw data to SQL fragments, group them by table name and cache them in a HashMap.
* Trigger writing when number of buffered records reached maxBachSize.
*
* @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId
*/
public void processLine(String line) throws SQLException {
bufferedCount += 1;
int firstComma = line.indexOf(',');
String tbName = line.substring(0, firstComma);
int lastComma = line.lastIndexOf(',');
int secondLastComma = line.lastIndexOf(',', lastComma - 1);
String values = "(" + line.substring(firstComma + 1, secondLastComma) + ") ";
if (tbValues.containsKey(tbName)) {
tbValues.put(tbName, tbValues.get(tbName) + values);
} else {
tbValues.put(tbName, values);
}
if (!tbTags.containsKey(tbName)) {
String location = line.substring(secondLastComma + 1, lastComma);
String groupId = line.substring(lastComma + 1);
String tagValues = "('" + location + "'," + groupId + ')';
tbTags.put(tbName, tagValues);
}
if (bufferedCount == maxBatchSize) {
flush();
}
}
/**
* Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it.
* In case of "Table does not exit" exception, create all tables in the sql and retry the sql.
*/
public void flush() throws SQLException {
StringBuilder sb = new StringBuilder("INSERT INTO ");
for (Map.Entry<String, String> entry : tbValues.entrySet()) {
String tableName = entry.getKey();
String values = entry.getValue();
sb.append(tableName).append(" values ").append(values).append(" ");
}
String sql = sb.toString();
try {
stmt.executeUpdate(sql);
} catch (SQLException e) {
// convert to error code defined in taoserror.h
int errorCode = e.getErrorCode() & 0xffff;
if (errorCode == 0x362 || errorCode == 0x218) {
// Table does not exist
createTables();
stmt.executeUpdate(sql);
} else {
throw e;
}
}
tbValues.clear();
bufferedCount = 0;
}
/**
* Create tables in batch using syntax:
* <p>
* CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
* </p>
*/
private void createTables() throws SQLException {
StringBuilder sb = new StringBuilder("CREATE TABLE ");
for (String tbName : tbValues.keySet()) {
String tagValues = tbTags.get(tbName);
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
}
String sql = sb.toString();
stmt.executeUpdate(sql);
}
public boolean hasBufferedValues() {
return bufferedCount > 0;
}
public int getBufferedCount() {
return bufferedCount;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
}
// ANCHOR_END: SQLWriter
package com.taos.example.highvolume; package com.taos.example.highvolume;
import java.sql.Connection; import org.slf4j.Logger;
import java.sql.DriverManager; import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
// ANCHOR: WriteTask // ANCHOR: WriteTask
class WriteTask { class WriteTask implements Runnable {
final static int maxBatchSize = 500; private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
// private final static int maxBatchSize = 3000;
final static int taskQueueCapacity = 1000;
private Thread writeThread = new Thread(this::doWriteTask);
private BlockingQueue<String> queue = new LinkedBlockingDeque<>(taskQueueCapacity); // the queue from which this writing task get raw data.
private final BlockingQueue<String> queue;
/** // A flag indicate whether to continue.
* Public interface for adding task to task queue. private boolean active = true;
* It will be invoked in read thread.
*/
public void put(String line) throws InterruptedException {
queue.put(line);
}
/** public WriteTask(BlockingQueue<String> taskQueue) {
* Start writing thread. this.queue = taskQueue;
*/
public void start() {
writeThread.start();
} }
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}
private void doWriteTask() { @Override
int count = 0; public void run() {
logger.info("started");
String line = null; // data getting from the queue just now.
SQLWriter writer = new SQLWriter(maxBatchSize);
try { try {
Connection conn = getConnection(); writer.init();
Statement stmt = conn.createStatement(); while (active) {
Map<String, String> tbValues = new HashMap<>(); line = queue.poll();
while (true) {
String line = queue.poll();
if (line != null) { if (line != null) {
processLine(tbValues, line); // parse raw data and buffer the data.
count += 1; writer.processLine(line);
if (count == maxBatchSize) { } else if (writer.hasBufferedValues()) {
// trigger writing when count of buffered records reached maxBachSize // write data immediately if no more data in the queue
flushValues(stmt, tbValues); writer.flush();
count = 0;
}
} else if (count == 0) {
// if queue is empty and no buffered records, sleep a while to avoid high CPU usage.
Thread.sleep(500);
} else { } else {
// if queue is empty and there are buffered records then flush immediately // sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, .
flushValues(stmt, tbValues); Thread.sleep(500);
count = 0;
} }
} }
if (writer.hasBufferedValues()) {
writer.flush();
}
} catch (Exception e) { } catch (Exception e) {
// handle exception String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount());
logger.error(msg, e);
} finally {
writer.close();
} }
}
private void processLine(Map<String, String> tbValues, String line) {
} }
private void flushValues(Statement stmt, Map<String, String> tbValues) { public void stop() {
logger.info("stop");
this.active = false;
} }
} }
// ANCHOR_END: WriteTask // ANCHOR_END: WriteTask
\ No newline at end of file
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
\ No newline at end of file
...@@ -21,33 +21,70 @@ title: 高效写入 ...@@ -21,33 +21,70 @@ title: 高效写入
在 Java 示例程序中采用拼接 SQL 的写入方式。 在 Java 示例程序中采用拼接 SQL 的写入方式。
### 主程序
主程序负责创建队列,并启动读线程和写线程。
```java title="主程序" ```java title="主程序"
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}}
``` ```
### 读任务的实现
<details> <details>
<summary>任务的实现</summary> <summary>任务的实现</summary>
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}}
``` ```
</details> </details>
### 写任务的实现
<details> <details>
<summary>任务的实现</summary> <summary>任务的实现</summary>
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}}
``` ```
</details> </details>
### SQLWriter 类的实现
SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板建表,然后重现执行 INSERT 语句。
## Python 示例程序 ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java:SQLWriter}}
```
### 执行示例程序
可用在本地集成开发环境里直接运行示例程序,注意要提前配置环境变量 TDENGINE_JDBC_URL。若要在远程服务器上执行示例程序,可按照下面步骤操作:
1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行:
```
mvn package
```
2. 远程服务器上创建 examples 目录:
```
mkdir -p examples/java
```
3. 复制依赖到服务器指定目录,比如:
```
scp -r .\target\lib user@host:~/examples/java
scp -r .\target\javaexample-1.0.jar user@host:~/examples/java
```
4. 配置环境变量,例如:
```
export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
5. 用 java 命令启动示例程序
```
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample <read_thread_count> <white_thread_count>
```
在 Python 示例程序中采用参数绑定的写入方式。 ## Python 示例程序
```python title="Python 示例程序" 在 Python 示例程序中采用参数绑定的写入方式。(开发中)
<!-- ```python title="Python 示例程序"
``` ``` -->
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册