未验证 提交 c7919a29 编写于 作者: B Bo Ding 提交者: GitHub

docs: high volume example (#14258)

* docs: high volume example

* docs: typo
上级 530cb7e8
package com.taos.example;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
class ReadTask {
private final int taskId;
private Thread readThread;
private List<WriteTask> writeTasks;
public ReadTask(int readTaskId, List<WriteTask> writeTasks) {
this.taskId = readTaskId;
this.writeTasks = writeTasks;
this.readThread = new Thread(this::doReadTask);
}
/**
* Simulate getting data from datasource.
*/
private Iterator<String> getSourceDataIterator() {
long now = System.currentTimeMillis();
int tableCount = 1000;
int numberRows = 10 ^ 7;
int tsStep = 100; // 100 milliseconds
String tbNamePrefix = "tb" + taskId + "-";
return new Iterator<String>() {
private long ts = now - numberRows * tsStep;
private int tbId = 0;
@Override
public boolean hasNext() {
return ts < now;
}
@Override
public String next() {
if (tbId < tableCount) {
tbId += 1;
} else {
ts += tsStep;
tbId = 0;
}
StringBuilder sb = new StringBuilder(tbNamePrefix + tbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(1.0).append(','); // current
sb.append(110).append(','); // voltage
sb.append(0.32).append(','); // phase
sb.append(3).append(','); // groupID
sb.append("Los Angeles"); // location
return sb.toString();
}
};
}
/**
* Read lines from datasource. And assign each line to a writing task according the table name.
*/
private void doReadTask() {
int numberWriteTask = writeTasks.size();
Iterator<String> it = getSourceDataIterator();
try {
while (it.hasNext()) {
String line = it.next();
String tbName = line.substring(0, line.indexOf(','));
int writeTaskId = tbName.hashCode() % numberWriteTask;
writeTasks.get(writeTaskId).put(line);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* start reading thread
*/
public void start() {
this.readThread.start();
}
}
class WriteTask {
final static int maxBachSize = 500;
final static int taskQueueCapacity = 1000;
private Thread writeThread = new Thread(this::doWriteTask);
private BlockingQueue<String> queue = new LinkedBlockingDeque<>(taskQueueCapacity);
public void put(String line) throws InterruptedException {
queue.put(line);
}
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}
public void doWriteTask() {
try {
Connection conn = getConnection();
Statement stmt = conn.createStatement();
while (true) {
String line = queue.poll();
if (line != null) {
} else {
Thread.sleep(1);
}
}
} catch (Exception e) {
// handle exception
}
}
/**
* start writing thread
*/
public void start() {
writeThread.start();
}
}
public class FastWriteExample {
final static int readTaskCount = 1;
final static int writeTaskCount = 4;
final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>();
public static void main(String[] args) throws InterruptedException {
// Create write tasks
for (int i = 0; i < writeTaskCount; ++i) {
WriteTask task = new WriteTask();
task.start();
writeTasks.add(new WriteTask());
}
// Create read tasks
for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(i, writeTasks);
task.start();
readTasks.add(task);
}
while (true) {
Thread.sleep(1000);
}
}
}
package com.taos.example.highvolume;
import java.util.ArrayList;
import java.util.List;
// ANCHOR: main
public class FastWriteExample {
final static int readTaskCount = 1;
final static int writeTaskCount = 4;
final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>();
public static void main(String[] args) throws InterruptedException {
// Create writing tasks.
// Every writing task contains a work thread and a task queue.
for (int i = 0; i < writeTaskCount; ++i) {
WriteTask task = new WriteTask();
task.start();
writeTasks.add(new WriteTask());
}
// Create reading tasks.
// Every reading task contains a work thread and a reference to each writing task.
for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(i, writeTasks);
task.start();
readTasks.add(task);
}
while (true) {
Thread.sleep(1000);
}
}
}
// ANCHOR_END: main
\ No newline at end of file
package com.taos.example.highvolume;
import java.util.Iterator;
import java.util.List;
class FakeDataSource implements Iterator {
private long now = System.currentTimeMillis();
private int tableCount = 1000;
private int numberRows = 10 ^ 7;
private int tsStep = 100; // 100 milliseconds
private String tbNamePrefix;
private long ts = now - numberRows * tsStep;
private int tbId = 0;
public FakeDataSource(String tbNamePrefix) {
this.tbNamePrefix = tbNamePrefix;
}
@Override
public boolean hasNext() {
return ts < now;
}
@Override
public String next() {
if (tbId < tableCount) {
tbId += 1;
} else {
ts += tsStep;
tbId = 0;
}
StringBuilder sb = new StringBuilder(tbNamePrefix + tbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(1.0).append(','); // current
sb.append(110).append(','); // voltage
sb.append(0.32).append(','); // phase
sb.append(3).append(','); // groupID
sb.append("Los Angeles"); // location
return sb.toString();
}
}
// ANCHOR: ReadTask
class ReadTask {
private final int taskId;
private Thread readThread;
private List<WriteTask> writeTasks;
public ReadTask(int readTaskId, List<WriteTask> writeTasks) {
this.taskId = readTaskId;
this.writeTasks = writeTasks;
this.readThread = new Thread(this::doReadTask);
}
/**
* Read lines from datasource.
* And assign each line to a writing task according to the table name.
*/
private void doReadTask() {
int numberWriteTask = writeTasks.size();
Iterator<String> it = new FakeDataSource("t" + this.taskId + "tb");
try {
while (it.hasNext()) {
String line = it.next();
String tbName = line.substring(0, line.indexOf(','));
int writeTaskId = tbName.hashCode() % numberWriteTask;
writeTasks.get(writeTaskId).put(line);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Start reading thread.
*/
public void start() {
this.readThread.start();
}
}
// ANCHOR_END: ReadTask
\ No newline at end of file
package com.taos.example.highvolume;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
// ANCHOR: WriteTask
class WriteTask {
// How many records will trigger writing before queue
final static int maxBatchSize = 500;
//
final static int taskQueueCapacity = 1000;
private Thread writeThread = new Thread(this::doWriteTask);
private BlockingQueue<String> queue = new LinkedBlockingDeque<>(taskQueueCapacity);
/**
* Public interface for adding task to task queue.
* It will be invoked in read thread.
*/
public void put(String line) throws InterruptedException {
queue.put(line);
}
/**
* Start writing thread.
*/
public void start() {
writeThread.start();
}
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}
private void doWriteTask() {
int count = 0;
try {
Connection conn = getConnection();
Statement stmt = conn.createStatement();
Map<String, String> tbValues = new HashMap<>();
while (true) {
String line = queue.poll();
if (line != null) {
processLine(tbValues, line);
count += 1;
if (count == maxBatchSize) {
// trigger writing when count of buffered records reached maxBachSize
flushValues(stmt, tbValues);
count = 0;
}
} else if (count == 0) {
// if queue is empty and no buffered records, sleep a while to avoid high CPU usage.
Thread.sleep(500);
} else {
// if queue is empty and there are buffered records then flush immediately
flushValues(stmt, tbValues);
count = 0;
}
}
} catch (Exception e) {
// handle exception
}
}
private void processLine(Map<String, String> tbValues, String line) {
}
private void flushValues(Statement stmt, Map<String, String> tbValues) {
}
}
// ANCHOR_END: WriteTask
\ No newline at end of file
......@@ -21,10 +21,29 @@ title: 高效写入
在 Java 示例程序中采用拼接 SQL 的写入方式。
```java title="Java 示例程序"
{{#include docs/examples/java/src/main/java/com/taos/example/FastWriteExample.java}}
```java title="程序"
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}}
```
<details>
<summary>写任务的实现</summary>
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}}
```
</details>
<details>
<summary>读任务的实现</summary>
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}}
```
</details>
## Python 示例程序
在 Python 示例程序中采用参数绑定的写入方式。
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册