提交 1d24d0dd 编写于 作者: D dingbo

docs: highvolumn example for java

上级 b16a69b5
...@@ -41,5 +41,36 @@ ...@@ -41,5 +41,36 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.5</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>
package com.taos.example;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class TestTableNotExits {
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}
public static void main(String[] args) throws SQLException {
try(Connection conn = getConnection()) {
try(Statement stmt = conn.createStatement()) {
try {
stmt.executeUpdate("insert into test.t1 values(1, 2) test.t2 values(3, 4)");
} catch (SQLException e) {
System.out.println(e.getErrorCode());
System.out.println(Integer.toHexString(e.getErrorCode()));
System.out.println(e);
}
}
}
}
}
...@@ -3,29 +3,80 @@ package com.taos.example.highvolume; ...@@ -3,29 +3,80 @@ package com.taos.example.highvolume;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public Long count() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
result.next();
return result.getLong(1);
}
return null;
}
}
// ANCHOR: main // ANCHOR: main
public class FastWriteExample { public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static int readTaskCount = 1;
final static int writeTaskCount = 1;
final static int taskQueueCapacity = 1000; final static int taskQueueCapacity = 1000;
final static List<BlockingQueue<String>> taskQueues = new ArrayList<>(); final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
final static List<ReadTask> readTasks = new ArrayList<>(); final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>(); final static List<WriteTask> writeTasks = new ArrayList<>();
final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();
public static void stopAll() { public static void stopAll() {
logger.info("stopAll"); logger.info("shutting down");
readTasks.forEach(task -> task.stop()); readTasks.forEach(task -> task.stop());
writeTasks.forEach(task -> task.stop()); writeTasks.forEach(task -> task.stop());
databaseMonitor.close();
} }
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException, SQLException {
int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 1;
logger.info("readTaskCount={}, writeTaskCount={}", readTaskCount, writeTaskCount);
databaseMonitor.init().prepareDatabase();
// Create task queues, whiting tasks and start writing threads. // Create task queues, whiting tasks and start writing threads.
for (int i = 0; i < writeTaskCount; ++i) { for (int i = 0; i < writeTaskCount; ++i) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity); BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity);
...@@ -45,7 +96,14 @@ public class FastWriteExample { ...@@ -45,7 +96,14 @@ public class FastWriteExample {
} }
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll)); Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));
long lastCount = 0;
while (true) {
Thread.sleep(10000);
long count = databaseMonitor.count();
logger.info("total_count={} speed={}", count, (count - lastCount) / 10);
lastCount = count;
}
} }
} }
// ANCHOR_END: main // ANCHOR_END: main
\ No newline at end of file
...@@ -12,8 +12,8 @@ import java.util.concurrent.BlockingQueue; ...@@ -12,8 +12,8 @@ import java.util.concurrent.BlockingQueue;
*/ */
class MockDataSource implements Iterator { class MockDataSource implements Iterator {
private String tbNamePrefix; private String tbNamePrefix;
private int tableCount = 10; private int tableCount = 100;
private int totalRowsPerTable = 10; private int totalRowsPerTable = 1000000;
// 100 milliseconds between two neighbouring rows. // 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - totalRowsPerTable * 100; long startMs = System.currentTimeMillis() - totalRowsPerTable * 100;
...@@ -28,7 +28,6 @@ class MockDataSource implements Iterator { ...@@ -28,7 +28,6 @@ class MockDataSource implements Iterator {
public MockDataSource(String tbNamePrefix) { public MockDataSource(String tbNamePrefix) {
this.tbNamePrefix = tbNamePrefix; this.tbNamePrefix = tbNamePrefix;
} }
@Override @Override
...@@ -50,8 +49,8 @@ class MockDataSource implements Iterator { ...@@ -50,8 +49,8 @@ class MockDataSource implements Iterator {
sb.append(current[currentRow % 5]).append(','); // current sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(groupId).append(','); // groupID sb.append(location[currentRow % 5]).append(','); // location
sb.append(location[currentRow % 5]); // location sb.append(groupId); // groupID
return sb.toString(); return sb.toString();
} }
......
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
// ANCHOR: SQLWriter
/**
* A helper class encapsulate the logic of writing using SQL.
* <p>
* The main interfaces are two methods:
* <ol>
* <li>{@link SQLWriter#processLine}, which receive raw data from WriteTask and group raw data by table names.</li>
* <li>{@link SQLWriter#flush}, which assemble INSERT statement and execute it.</li>
* </ol>
* <p>
* There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb".
* This ensure that checking table existence is a one-time-only operation.
* </p>
*
* </p>
*/
public class SQLWriter {
final static Logger logger = LoggerFactory.getLogger(SQLWriter.class);
private Connection conn;
private Statement stmt;
/**
* current number of buffered records
*/
private int bufferedCount = 0;
/**
* Maximum number of buffered records.
* Flush action will be triggered if bufferedCount reached this value,
*/
private int maxBatchSize;
/**
* Map from table name to column values. For example:
* "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)"
*/
private Map<String, String> tbValues = new HashMap<>();
/**
* Map from table name to tag values in the same order as creating stable.
* Used for creating table.
*/
private Map<String, String> tbTags = new HashMap<>();
public SQLWriter(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
/**
* Get Database Connection
*
* @return Connection
* @throws SQLException
*/
private static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
return DriverManager.getConnection(jdbcURL);
}
/**
* Create Connection and Statement
*
* @throws SQLException
*/
public void init() throws SQLException {
conn = getConnection();
stmt = conn.createStatement();
stmt.execute("use test");
}
/**
* Convert raw data to SQL fragments, group them by table name and cache them in a HashMap.
* Trigger writing when number of buffered records reached maxBachSize.
*
* @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId
*/
public void processLine(String line) throws SQLException {
bufferedCount += 1;
int firstComma = line.indexOf(',');
String tbName = line.substring(0, firstComma);
int lastComma = line.lastIndexOf(',');
int secondLastComma = line.lastIndexOf(',', lastComma - 1);
String values = "(" + line.substring(firstComma + 1, secondLastComma) + ") ";
if (tbValues.containsKey(tbName)) {
tbValues.put(tbName, tbValues.get(tbName) + values);
} else {
tbValues.put(tbName, values);
}
if (!tbTags.containsKey(tbName)) {
String location = line.substring(secondLastComma + 1, lastComma);
String groupId = line.substring(lastComma + 1);
String tagValues = "('" + location + "'," + groupId + ')';
tbTags.put(tbName, tagValues);
}
if (bufferedCount == maxBatchSize) {
flush();
}
}
/**
* Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it.
* In case of "Table does not exit" exception, create all tables in the sql and retry the sql.
*/
public void flush() throws SQLException {
StringBuilder sb = new StringBuilder("INSERT INTO ");
for (Map.Entry<String, String> entry : tbValues.entrySet()) {
String tableName = entry.getKey();
String values = entry.getValue();
sb.append(tableName).append(" values ").append(values).append(" ");
}
String sql = sb.toString();
try {
stmt.executeUpdate(sql);
} catch (SQLException e) {
// convert to error code defined in taoserror.h
int errorCode = e.getErrorCode() & 0xffff;
if (errorCode == 0x362 || errorCode == 0x218) {
// Table does not exist
createTables();
stmt.executeUpdate(sql);
} else {
throw e;
}
}
tbValues.clear();
bufferedCount = 0;
}
/**
* Create tables in batch using syntax:
* <p>
* CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
* </p>
*/
private void createTables() throws SQLException {
StringBuilder sb = new StringBuilder("CREATE TABLE ");
for (String tbName : tbValues.keySet()) {
String tagValues = tbTags.get(tbName);
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
}
String sql = sb.toString();
stmt.executeUpdate(sql);
}
public boolean hasBufferedValues() {
return bufferedCount > 0;
}
public int getBufferedCount() {
return bufferedCount;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
}
// ANCHOR_END: SQLWriter
...@@ -3,74 +3,55 @@ package com.taos.example.highvolume; ...@@ -3,74 +3,55 @@ package com.taos.example.highvolume;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
// ANCHOR: WriteTask // ANCHOR: WriteTask
class WriteTask implements Runnable { class WriteTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(WriteTask.class); private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final static int maxBatchSize = 500; private final static int maxBatchSize = 3000;
// the queue from which this writing task get raw data.
private final BlockingQueue<String> queue; private final BlockingQueue<String> queue;
// A flag indicate whether to continue.
private boolean active = true; private boolean active = true;
public WriteTask(BlockingQueue<String> taskQueue) { public WriteTask(BlockingQueue<String> taskQueue) {
this.queue = taskQueue; this.queue = taskQueue;
} }
private static Connection getConnection() throws SQLException, ClassNotFoundException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
return DriverManager.getConnection(jdbcURL);
}
@Override @Override
public void run() { public void run() {
logger.info("started"); logger.info("started");
int bufferedCount = 0; String line = null; // data getting from the queue just now.
String line = null; SQLWriter writer = new SQLWriter(maxBatchSize);
try (Connection conn = getConnection()) { try {
try (Statement stmt = conn.createStatement()) { writer.init();
Map<String, String> tbValues = new HashMap<>();
while (active) { while (active) {
line = queue.poll(); line = queue.poll();
if (line != null) { if (line != null) {
processLine(tbValues, line); // parse raw data and buffer the data.
bufferedCount += 1; writer.processLine(line);
if (bufferedCount == maxBatchSize) { } else if (writer.hasBufferedValues()) {
// trigger writing when count of buffered records reached maxBachSize // write data immediately if no more data in the queue
flushValues(stmt, tbValues); writer.flush();
bufferedCount = 0;
}
} else if (bufferedCount == 0) {
// if queue is empty and no buffered records, sleep a while to avoid high CPU usage.
Thread.sleep(500);
} else { } else {
// if queue is empty and there are buffered records then flush immediately // sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, .
flushValues(stmt, tbValues); Thread.sleep(500);
bufferedCount = 0;
}
} }
if (bufferedCount > 0) {
flushValues(stmt, tbValues);
} }
if (writer.hasBufferedValues()) {
writer.flush();
} }
} catch (Exception e) { } catch (Exception e) {
String msg = String.format("line=%s, bufferedCount=%s", line, bufferedCount); String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount());
logger.error(msg, e); logger.error(msg, e);
} finally {
writer.close();
} }
} }
private void processLine(Map<String, String> tbValues, String line) {
}
private void flushValues(Statement stmt, Map<String, String> tbValues) {
StringBuilder sb = new StringBuilder("INSERT INTO");
}
public void stop() { public void stop() {
logger.info("stop"); logger.info("stop");
this.active = false; this.active = false;
......
...@@ -21,28 +21,58 @@ title: 高效写入 ...@@ -21,28 +21,58 @@ title: 高效写入
在 Java 示例程序中采用拼接 SQL 的写入方式。 在 Java 示例程序中采用拼接 SQL 的写入方式。
### 主程序
```java title="主程序" ```java title="主程序"
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}}
``` ```
### 读任务的实现
<details> <details>
<summary>任务的实现</summary> <summary>任务的实现</summary>
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}}
``` ```
</details> </details>
### 写任务的实现
<details> <details>
<summary>任务的实现</summary> <summary>任务的实现</summary>
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}}
``` ```
</details> </details>
### SQLWriter 类的实现
### 执行示例程序
可用在本地集成开发环境里直接运行示例程序,注意要提前配置环境变量 TDENGINE_JDBC_URL。若要在远程服务器上执行示例程序,可按照下面步骤操作:
1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行:
```
mvn package
```
2. 远程服务器上创建 examples 目录:
```
mkdir -p examples/java
```
3. 复制依赖到服务器指定目录,比如:
```
scp -r .\target\lib user@host:~/examples/java
scp -r .\target\javaexample-1.0.jar user@host:~/examples/java
```
4. 配置环境变量,例如:
```
export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
5. 用 java 命令启动示例程序
```
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample
```
## Python 示例程序 ## Python 示例程序
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册