提交 ad042ede 编写于 作者: D dingbo

docs: java example

上级 d7d98a26
...@@ -8,6 +8,7 @@ import java.util.ArrayList; ...@@ -8,6 +8,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
/** /**
* Prepare target database. * Prepare target database.
...@@ -57,7 +58,7 @@ class DataBaseMonitor { ...@@ -57,7 +58,7 @@ class DataBaseMonitor {
public class FastWriteExample { public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static int taskQueueCapacity = 1000; final static int taskQueueCapacity = Integer.MAX_VALUE / 100;
final static List<BlockingQueue<String>> taskQueues = new ArrayList<>(); final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
final static List<ReadTask> readTasks = new ArrayList<>(); final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>(); final static List<WriteTask> writeTasks = new ArrayList<>();
......
...@@ -12,11 +12,11 @@ import java.util.concurrent.BlockingQueue; ...@@ -12,11 +12,11 @@ import java.util.concurrent.BlockingQueue;
*/ */
class MockDataSource implements Iterator { class MockDataSource implements Iterator {
private String tbNamePrefix; private String tbNamePrefix;
private int tableCount = 100; private int tableCount = 1000;
private int totalRowsPerTable = 1000000; private long maxRowsPerTable = 1000000000L;
// 100 milliseconds between two neighbouring rows. // 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - totalRowsPerTable * 100; long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0; private int currentRow = 0;
private int currentTbId = -1; private int currentTbId = -1;
...@@ -37,7 +37,7 @@ class MockDataSource implements Iterator { ...@@ -37,7 +37,7 @@ class MockDataSource implements Iterator {
currentTbId = 0; currentTbId = 0;
currentRow += 1; currentRow += 1;
} }
return currentRow < totalRowsPerTable; return currentRow < maxRowsPerTable;
} }
@Override @Override
...@@ -61,24 +61,27 @@ class ReadTask implements Runnable { ...@@ -61,24 +61,27 @@ class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class); private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId; private final int taskId;
private final List<BlockingQueue<String>> taskQueues; private final List<BlockingQueue<String>> taskQueues;
private final int queueCount;
private boolean active = true; private boolean active = true;
public ReadTask(int readTaskId, List<BlockingQueue<String>> queues) { public ReadTask(int readTaskId, List<BlockingQueue<String>> queues) {
this.taskId = readTaskId; this.taskId = readTaskId;
this.taskQueues = queues; this.taskQueues = queues;
this.queueCount = queues.size();
} }
/** /**
* Hash data received to different queues. * Assign data received to different queues.
* Here we use the hashcode of table name for demo. * Here we use the suffix number in table name.
* You are expected to define your own rule in practice. * You are expected to define your own rule in practice.
* *
* @param line record received * @param line record received
* @return which queue to use * @return which queue to use
*/ */
public int getQueueId(String line) { public int getQueueId(String line) {
String tbName = line.substring(0, line.indexOf(',')); String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101
return Math.abs(tbName.hashCode()) % taskQueues.size(); String suffixNumber = tbName.split("_")[1];
return Integer.parseInt(suffixNumber) % this.queueCount;
} }
@Override @Override
...@@ -91,8 +94,8 @@ class ReadTask implements Runnable { ...@@ -91,8 +94,8 @@ class ReadTask implements Runnable {
int queueId = getQueueId(line); int queueId = getQueueId(line);
taskQueues.get(queueId).put(line); taskQueues.get(queueId).put(line);
} }
} catch (InterruptedException e) { } catch (Exception e) {
e.printStackTrace(); logger.error("Read Task Error", e);
} }
} }
......
...@@ -23,11 +23,14 @@ title: 高效写入 ...@@ -23,11 +23,14 @@ title: 高效写入
### 主程序 ### 主程序
主程序负责创建队列,并启动读线程和写线程。
```java title="主程序" ```java title="主程序"
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}}
``` ```
### 读任务的实现 ### 读任务的实现
<details> <details>
<summary>读任务的实现</summary> <summary>读任务的实现</summary>
...@@ -49,6 +52,8 @@ title: 高效写入 ...@@ -49,6 +52,8 @@ title: 高效写入
### SQLWriter 类的实现 ### SQLWriter 类的实现
SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板建表,然后重现执行 INSERT 语句。
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java:SQLWriter}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java:SQLWriter}}
``` ```
...@@ -74,13 +79,12 @@ title: 高效写入 ...@@ -74,13 +79,12 @@ title: 高效写入
``` ```
5. 用 java 命令启动示例程序 5. 用 java 命令启动示例程序
``` ```
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample <read_thread_count> <white_thread_count>
``` ```
## Python 示例程序 ## Python 示例程序
在 Python 示例程序中采用参数绑定的写入方式。 在 Python 示例程序中采用参数绑定的写入方式。(开发中)
<!-- ```python title="Python 示例程序"
```python title="Python 示例程序" ``` -->
developing
```
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册