提交 8980b090 编写于 作者: D dingbo

docs: refactor highvolume example

上级 074fab00
...@@ -2,30 +2,30 @@ package com.taos.example.highvolume; ...@@ -2,30 +2,30 @@ package com.taos.example.highvolume;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
// ANCHOR: main // ANCHOR: main
public class FastWriteExample { public class FastWriteExample {
final static int readTaskCount = 1; final static int readTaskCount = 2;
final static int writeTaskCount = 4; final static int writeTaskCount = 4;
final static List<ReadTask> readTasks = new ArrayList<>(); final static int taskQueueCapacity = 1000;
final static List<WriteTask> writeTasks = new ArrayList<>(); final static List<BlockingQueue> taskQueues = new ArrayList<>();
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
// Create writing tasks. // Create task queues, whiting tasks and start writing threads.
// Every writing task contains a work thread and a task queue.
for (int i = 0; i < writeTaskCount; ++i) { for (int i = 0; i < writeTaskCount; ++i) {
WriteTask task = new WriteTask(); BlockingQueue<String> queue = new LinkedBlockingDeque<>(taskQueueCapacity);
task.start(); WriteTask task = new WriteTask(queue);
writeTasks.add(new WriteTask()); Thread t = new Thread(task);
t.start();
} }
// Create reading tasks. // create reading tasks and start reading threads
// Every reading task contains a work thread and a reference to each writing task.
for (int i = 0; i < readTaskCount; ++i) { for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(i, writeTasks); ReadTask task = new ReadTask(taskQueues);
task.start(); Thread t = new Thread(task);
readTasks.add(task);
} }
while (true) { while (true) {
......
...@@ -10,28 +10,12 @@ import java.util.concurrent.BlockingQueue; ...@@ -10,28 +10,12 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
// ANCHOR: WriteTask // ANCHOR: WriteTask
class WriteTask { class WriteTask implements Runnable {
final static int maxBatchSize = 500; final static int maxBatchSize = 500;
// private final BlockingQueue<String> queue;
final static int taskQueueCapacity = 1000;
private Thread writeThread = new Thread(this::doWriteTask); public WriteTask(BlockingQueue<String> taskQueue) {
this.queue = taskQueue;
private BlockingQueue<String> queue = new LinkedBlockingDeque<>(taskQueueCapacity);
/**
* Public interface for adding task to task queue.
* It will be invoked in read thread.
*/
public void put(String line) throws InterruptedException {
queue.put(line);
}
/**
* Start writing thread.
*/
public void start() {
writeThread.start();
} }
private static Connection getConnection() throws SQLException { private static Connection getConnection() throws SQLException {
...@@ -39,7 +23,7 @@ class WriteTask { ...@@ -39,7 +23,7 @@ class WriteTask {
return DriverManager.getConnection(jdbcUrl); return DriverManager.getConnection(jdbcUrl);
} }
private void doWriteTask() { public void run() {
int count = 0; int count = 0;
try { try {
Connection conn = getConnection(); Connection conn = getConnection();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册