diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java index 3f642d8851c0dfc969c66174def78c11b3bb65ec..b0d3e79b5c4f8bd6a565d0ea39f873476933f320 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java @@ -2,30 +2,30 @@ package com.taos.example.highvolume; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; // ANCHOR: main public class FastWriteExample { - final static int readTaskCount = 1; + final static int readTaskCount = 2; final static int writeTaskCount = 4; - final static List readTasks = new ArrayList<>(); - final static List writeTasks = new ArrayList<>(); + final static int taskQueueCapacity = 1000; + final static List taskQueues = new ArrayList<>(); public static void main(String[] args) throws InterruptedException { - // Create writing tasks. - // Every writing task contains a work thread and a task queue. + // Create task queues, whiting tasks and start writing threads. for (int i = 0; i < writeTaskCount; ++i) { - WriteTask task = new WriteTask(); - task.start(); - writeTasks.add(new WriteTask()); + BlockingQueue queue = new LinkedBlockingDeque<>(taskQueueCapacity); + WriteTask task = new WriteTask(queue); + Thread t = new Thread(task); + t.start(); } - // Create reading tasks. - // Every reading task contains a work thread and a reference to each writing task. + // create reading tasks and start reading threads for (int i = 0; i < readTaskCount; ++i) { - ReadTask task = new ReadTask(i, writeTasks); - task.start(); - readTasks.add(task); + ReadTask task = new ReadTask(taskQueues); + Thread t = new Thread(task); } while (true) { diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java index 6a5083c69908155a4de401e5d132b60cafc0c98b..5296f939298f9e1041f1a3ed3b602a8d225ba341 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java @@ -10,28 +10,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; // ANCHOR: WriteTask -class WriteTask { +class WriteTask implements Runnable { final static int maxBatchSize = 500; - // - final static int taskQueueCapacity = 1000; + private final BlockingQueue queue; - private Thread writeThread = new Thread(this::doWriteTask); - - private BlockingQueue queue = new LinkedBlockingDeque<>(taskQueueCapacity); - - /** - * Public interface for adding task to task queue. - * It will be invoked in read thread. - */ - public void put(String line) throws InterruptedException { - queue.put(line); - } - - /** - * Start writing thread. - */ - public void start() { - writeThread.start(); + public WriteTask(BlockingQueue taskQueue) { + this.queue = taskQueue; } private static Connection getConnection() throws SQLException { @@ -39,7 +23,7 @@ class WriteTask { return DriverManager.getConnection(jdbcUrl); } - private void doWriteTask() { + public void run() { int count = 0; try { Connection conn = getConnection();