提交 b16a69b5 编写于 作者: D dingbo

docs: high volumn example for java

上级 8980b090
.vscode .vscode
*.lock *.lock
.idea .env
\ No newline at end of file *.~
.idea
.vscode
*.lock
\ No newline at end of file
...@@ -24,6 +24,16 @@ ...@@ -24,6 +24,16 @@
<version>2.0.38</version> <version>2.0.38</version>
</dependency> </dependency>
<!-- ANCHOR_END: dep--> <!-- ANCHOR_END: dep-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
......
package com.taos.example.highvolume; package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
// ANCHOR: main // ANCHOR: main
public class FastWriteExample { public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static int readTaskCount = 2; final static int readTaskCount = 1;
final static int writeTaskCount = 4; final static int writeTaskCount = 1;
final static int taskQueueCapacity = 1000; final static int taskQueueCapacity = 1000;
final static List<BlockingQueue> taskQueues = new ArrayList<>(); final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>();
public static void stopAll() {
logger.info("stopAll");
readTasks.forEach(task -> task.stop());
writeTasks.forEach(task -> task.stop());
}
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
// Create task queues, whiting tasks and start writing threads. // Create task queues, whiting tasks and start writing threads.
for (int i = 0; i < writeTaskCount; ++i) { for (int i = 0; i < writeTaskCount; ++i) {
BlockingQueue<String> queue = new LinkedBlockingDeque<>(taskQueueCapacity); BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity);
taskQueues.add(queue);
WriteTask task = new WriteTask(queue); WriteTask task = new WriteTask(queue);
Thread t = new Thread(task); Thread t = new Thread(task);
t.setName("WriteThread-" + i);
t.start(); t.start();
} }
// create reading tasks and start reading threads // create reading tasks and start reading threads
for (int i = 0; i < readTaskCount; ++i) { for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(taskQueues); ReadTask task = new ReadTask(i, taskQueues);
Thread t = new Thread(task); Thread t = new Thread(task);
t.setName("ReadThread-" + i);
t.start();
} }
while (true) { Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));
Thread.sleep(1000);
}
} }
} }
......
package com.taos.example.highvolume; package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue;
/**
class FakeDataSource implements Iterator { * Generate test data
private long now = System.currentTimeMillis(); */
private int tableCount = 1000; class MockDataSource implements Iterator {
private int numberRows = 10 ^ 7;
private int tsStep = 100; // 100 milliseconds
private String tbNamePrefix; private String tbNamePrefix;
private long ts = now - numberRows * tsStep; private int tableCount = 10;
private int tbId = 0; private int totalRowsPerTable = 10;
// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - totalRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
public FakeDataSource(String tbNamePrefix) { // mock values
String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix) {
this.tbNamePrefix = tbNamePrefix; this.tbNamePrefix = tbNamePrefix;
} }
@Override @Override
public boolean hasNext() { public boolean hasNext() {
return ts < now; currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < totalRowsPerTable;
} }
@Override @Override
public String next() { public String next() {
if (tbId < tableCount) { long ts = startMs + 100 * currentRow;
tbId += 1; int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
} else { StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
ts += tsStep;
tbId = 0;
}
StringBuilder sb = new StringBuilder(tbNamePrefix + tbId + ","); // tbName
sb.append(ts).append(','); // ts sb.append(ts).append(','); // ts
sb.append(1.0).append(','); // current sb.append(current[currentRow % 5]).append(','); // current
sb.append(110).append(','); // voltage sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(0.32).append(','); // phase sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(3).append(','); // groupID sb.append(groupId).append(','); // groupID
sb.append("Los Angeles"); // location sb.append(location[currentRow % 5]); // location
return sb.toString(); return sb.toString();
} }
} }
// ANCHOR: ReadTask // ANCHOR: ReadTask
class ReadTask { class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId; private final int taskId;
private Thread readThread; private final List<BlockingQueue<String>> taskQueues;
private List<WriteTask> writeTasks; private boolean active = true;
public ReadTask(int readTaskId, List<WriteTask> writeTasks) { public ReadTask(int readTaskId, List<BlockingQueue<String>> queues) {
this.taskId = readTaskId; this.taskId = readTaskId;
this.writeTasks = writeTasks; this.taskQueues = queues;
this.readThread = new Thread(this::doReadTask);
} }
/** /**
* Read lines from datasource. * Hash data received to different queues.
* And assign each line to a writing task according to the table name. * Here we use the hashcode of table name for demo.
* You are expected to define your own rule in practice.
*
* @param line record received
* @return which queue to use
*/ */
private void doReadTask() { public int getQueueId(String line) {
int numberWriteTask = writeTasks.size(); String tbName = line.substring(0, line.indexOf(','));
Iterator<String> it = new FakeDataSource("t" + this.taskId + "tb"); return Math.abs(tbName.hashCode()) % taskQueues.size();
}
@Override
public void run() {
logger.info("started");
Iterator<String> it = new MockDataSource("tb" + this.taskId);
try { try {
while (it.hasNext()) { while (it.hasNext() && active) {
String line = it.next(); String line = it.next();
String tbName = line.substring(0, line.indexOf(',')); int queueId = getQueueId(line);
int writeTaskId = Math.abs(tbName.hashCode()) % numberWriteTask; taskQueues.get(queueId).put(line);
writeTasks.get(writeTaskId).put(line);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
/** public void stop() {
* Start reading thread. logger.info("stop");
*/ this.active = false;
public void start() {
this.readThread.start();
} }
} }
......
package com.taos.example.highvolume; package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
...@@ -7,60 +10,70 @@ import java.sql.Statement; ...@@ -7,60 +10,70 @@ import java.sql.Statement;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
// ANCHOR: WriteTask // ANCHOR: WriteTask
class WriteTask implements Runnable { class WriteTask implements Runnable {
final static int maxBatchSize = 500; private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final static int maxBatchSize = 500;
private final BlockingQueue<String> queue; private final BlockingQueue<String> queue;
private boolean active = true;
public WriteTask(BlockingQueue<String> taskQueue) { public WriteTask(BlockingQueue<String> taskQueue) {
this.queue = taskQueue; this.queue = taskQueue;
} }
private static Connection getConnection() throws SQLException { private static Connection getConnection() throws SQLException, ClassNotFoundException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
return DriverManager.getConnection(jdbcUrl); return DriverManager.getConnection(jdbcURL);
} }
@Override
public void run() { public void run() {
int count = 0; logger.info("started");
try { int bufferedCount = 0;
Connection conn = getConnection(); String line = null;
Statement stmt = conn.createStatement(); try (Connection conn = getConnection()) {
Map<String, String> tbValues = new HashMap<>(); try (Statement stmt = conn.createStatement()) {
while (true) { Map<String, String> tbValues = new HashMap<>();
String line = queue.poll(); while (active) {
if (line != null) { line = queue.poll();
processLine(tbValues, line); if (line != null) {
count += 1; processLine(tbValues, line);
if (count == maxBatchSize) { bufferedCount += 1;
// trigger writing when count of buffered records reached maxBachSize if (bufferedCount == maxBatchSize) {
// trigger writing when count of buffered records reached maxBachSize
flushValues(stmt, tbValues);
bufferedCount = 0;
}
} else if (bufferedCount == 0) {
// if queue is empty and no buffered records, sleep a while to avoid high CPU usage.
Thread.sleep(500);
} else {
// if queue is empty and there are buffered records then flush immediately
flushValues(stmt, tbValues); flushValues(stmt, tbValues);
count = 0; bufferedCount = 0;
} }
} else if (count == 0) { }
// if queue is empty and no buffered records, sleep a while to avoid high CPU usage. if (bufferedCount > 0) {
Thread.sleep(500);
} else {
// if queue is empty and there are buffered records then flush immediately
flushValues(stmt, tbValues); flushValues(stmt, tbValues);
count = 0;
} }
} }
} catch (Exception e) { } catch (Exception e) {
// handle exception String msg = String.format("line=%s, bufferedCount=%s", line, bufferedCount);
logger.error(msg, e);
} }
} }
private void processLine(Map<String, String> tbValues, String line) { private void processLine(Map<String, String> tbValues, String line) {
} }
private void flushValues(Statement stmt, Map<String, String> tbValues) { private void flushValues(Statement stmt, Map<String, String> tbValues) {
StringBuilder sb = new StringBuilder("INSERT INTO");
} }
public void stop() {
logger.info("stop");
this.active = false;
}
} }
// ANCHOR_END: WriteTask // ANCHOR_END: WriteTask
\ No newline at end of file
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册