diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java new file mode 100644 index 0000000000000000000000000000000000000000..04b149a4b96441ecfd1b0bdde54c9ed71349cab2 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java @@ -0,0 +1,63 @@ +package com.taos.example.highvolume; + +import java.sql.*; + +/** + * Prepare target database. + * Count total records in database periodically so that we can estimate the writing speed. + */ +public class DataBaseMonitor { + private Connection conn; + private Statement stmt; + + public DataBaseMonitor init() throws SQLException { + if (conn == null) { + String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + conn = DriverManager.getConnection(jdbcURL); + stmt = conn.createStatement(); + } + return this; + } + + public void close() { + try { + stmt.close(); + } catch (SQLException e) { + } + try { + conn.close(); + } catch (SQLException e) { + } + } + + public void prepareDatabase() throws SQLException { + stmt.execute("DROP DATABASE IF EXISTS test"); + stmt.execute("CREATE DATABASE test"); + stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"); + } + + public Long count() throws SQLException { + if (!stmt.isClosed()) { + ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters"); + result.next(); + return result.getLong(1); + } + return null; + } + + /** + * show test.stables; + * + * name | created_time | columns | tags | tables | + * ============================================================================================ + * meters | 2022-07-20 08:39:30.902 | 4 | 2 | 620000 | + */ + public Long getTableCount() throws SQLException { + if (!stmt.isClosed()) { + ResultSet result = stmt.executeQuery("show test.stables"); + result.next(); + return result.getLong(5); + } + return null; + } +} \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java new file mode 100644 index 0000000000000000000000000000000000000000..41b59551ca69a4056c2f2b572d169bd08dc4fcfe --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java @@ -0,0 +1,70 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + + +public class FastWriteExample { + final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); + + final static int taskQueueCapacity = 1000000; + final static List> taskQueues = new ArrayList<>(); + final static List readTasks = new ArrayList<>(); + final static List writeTasks = new ArrayList<>(); + final static DataBaseMonitor databaseMonitor = new DataBaseMonitor(); + + public static void stopAll() { + logger.info("shutting down"); + readTasks.forEach(task -> task.stop()); + writeTasks.forEach(task -> task.stop()); + databaseMonitor.close(); + } + + public static void main(String[] args) throws InterruptedException, SQLException { + int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1; + int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 3; + int tableCount = args.length > 2 ? Integer.parseInt(args[2]) : 1000; + int maxBatchSize = args.length > 3 ? Integer.parseInt(args[3]) : 3000; + + logger.info("readTaskCount={}, writeTaskCount={} tableCount={} maxBatchSize={}", + readTaskCount, writeTaskCount, tableCount, maxBatchSize); + + databaseMonitor.init().prepareDatabase(); + + // Create task queues, whiting tasks and start writing threads. + for (int i = 0; i < writeTaskCount; ++i) { + BlockingQueue queue = new ArrayBlockingQueue<>(taskQueueCapacity); + taskQueues.add(queue); + WriteTask task = new WriteTask(queue, maxBatchSize); + Thread t = new Thread(task); + t.setName("WriteThread-" + i); + t.start(); + } + + // create reading tasks and start reading threads + int tableCountPerTask = tableCount / readTaskCount; + for (int i = 0; i < readTaskCount; ++i) { + ReadTask task = new ReadTask(i, taskQueues, tableCountPerTask); + Thread t = new Thread(task); + t.setName("ReadThread-" + i); + t.start(); + } + + Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll)); + + long lastCount = 0; + while (true) { + Thread.sleep(10000); + long numberOfTable = databaseMonitor.getTableCount(); + long count = databaseMonitor.count(); + logger.info("numberOfTable={} count={} speed={}", numberOfTable, count, (count - lastCount) / 10); + lastCount = count; + } + } +} \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java new file mode 100644 index 0000000000000000000000000000000000000000..6fe83f002ebcb9d82e026e9a32886fd22bfefbe9 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java @@ -0,0 +1,53 @@ +package com.taos.example.highvolume; + +import java.util.Iterator; + +/** + * Generate test data + */ +class MockDataSource implements Iterator { + private String tbNamePrefix; + private int tableCount; + private long maxRowsPerTable = 1000000000L; + + // 100 milliseconds between two neighbouring rows. + long startMs = System.currentTimeMillis() - maxRowsPerTable * 100; + private int currentRow = 0; + private int currentTbId = -1; + + // mock values + String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"}; + float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f}; + int[] voltage = {119, 116, 111, 113, 118}; + float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f}; + + public MockDataSource(String tbNamePrefix, int tableCount) { + this.tbNamePrefix = tbNamePrefix; + this.tableCount = tableCount; + } + + @Override + public boolean hasNext() { + currentTbId += 1; + if (currentTbId == tableCount) { + currentTbId = 0; + currentRow += 1; + } + return currentRow < maxRowsPerTable; + } + + @Override + public String next() { + long ts = startMs + 100 * currentRow; + int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1; + StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName + sb.append(ts).append(','); // ts + sb.append(current[currentRow % 5]).append(','); // current + sb.append(voltage[currentRow % 5]).append(','); // voltage + sb.append(phase[currentRow % 5]).append(','); // phase + sb.append(location[currentRow % 5]).append(','); // location + sb.append(groupId); // groupID + + return sb.toString(); + } +} \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java new file mode 100644 index 0000000000000000000000000000000000000000..a6fcfed1d28281d46aff493ef9783972858ebe62 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java @@ -0,0 +1,58 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +class ReadTask implements Runnable { + private final static Logger logger = LoggerFactory.getLogger(ReadTask.class); + private final int taskId; + private final List> taskQueues; + private final int queueCount; + private final int tableCount; + private boolean active = true; + + public ReadTask(int readTaskId, List> queues, int tableCount) { + this.taskId = readTaskId; + this.taskQueues = queues; + this.queueCount = queues.size(); + this.tableCount = tableCount; + } + + /** + * Assign data received to different queues. + * Here we use the suffix number in table name. + * You are expected to define your own rule in practice. + * + * @param line record received + * @return which queue to use + */ + public int getQueueId(String line) { + String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101 + String suffixNumber = tbName.split("_")[1]; + return Integer.parseInt(suffixNumber) % this.queueCount; + } + + @Override + public void run() { + logger.info("started"); + Iterator it = new MockDataSource("tb" + this.taskId, tableCount); + try { + while (it.hasNext() && active) { + String line = it.next(); + int queueId = getQueueId(line); + taskQueues.get(queueId).put(line); + } + } catch (Exception e) { + logger.error("Read Task Error", e); + } + } + + public void stop() { + logger.info("stop"); + this.active = false; + } +} \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..c2989acdbe3d0f56d7451ac86051a55955ce14de --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java @@ -0,0 +1,205 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.HashMap; +import java.util.Map; + +/** + * A helper class encapsulate the logic of writing using SQL. + *

+ * The main interfaces are two methods: + *

    + *
  1. {@link SQLWriter#processLine}, which receive raw lines from WriteTask and group them by table names.
  2. + *
  3. {@link SQLWriter#flush}, which assemble INSERT statement and execute it.
  4. + *
+ *

+ * There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb". + * This ensure that checking table existence is a one-time-only operation. + *

+ * + *

+ */ +public class SQLWriter { + final static Logger logger = LoggerFactory.getLogger(SQLWriter.class); + + private Connection conn; + private Statement stmt; + + /** + * current number of buffered records + */ + private int bufferedCount = 0; + /** + * Maximum number of buffered records. + * Flush action will be triggered if bufferedCount reached this value, + */ + private int maxBatchSize; + + + /** + * Maximum SQL length. + */ + private int maxSQLLength; + + /** + * Map from table name to column values. For example: + * "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)" + */ + private Map tbValues = new HashMap<>(); + + /** + * Map from table name to tag values in the same order as creating stable. + * Used for creating table. + */ + private Map tbTags = new HashMap<>(); + + public SQLWriter(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + } + + + /** + * Get Database Connection + * + * @return Connection + * @throws SQLException + */ + private static Connection getConnection() throws SQLException { + String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + return DriverManager.getConnection(jdbcURL); + } + + /** + * Create Connection and Statement + * + * @throws SQLException + */ + public void init() throws SQLException { + conn = getConnection(); + stmt = conn.createStatement(); + stmt.execute("use test"); + ResultSet rs = stmt.executeQuery("show variables"); + while (rs.next()) { + String configName = rs.getString(1); + if ("maxSQLLength".equals(configName)) { + maxSQLLength = Integer.parseInt(rs.getString(2)); + logger.info("maxSQLLength={}", maxSQLLength); + } + } + } + + /** + * Convert raw data to SQL fragments, group them by table name and cache them in a HashMap. + * Trigger writing when number of buffered records reached maxBachSize. + * + * @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId + */ + public void processLine(String line) throws SQLException { + bufferedCount += 1; + int firstComma = line.indexOf(','); + String tbName = line.substring(0, firstComma); + int lastComma = line.lastIndexOf(','); + int secondLastComma = line.lastIndexOf(',', lastComma - 1); + String value = "(" + line.substring(firstComma + 1, secondLastComma) + ") "; + if (tbValues.containsKey(tbName)) { + tbValues.put(tbName, tbValues.get(tbName) + value); + } else { + tbValues.put(tbName, value); + } + if (!tbTags.containsKey(tbName)) { + String location = line.substring(secondLastComma + 1, lastComma); + String groupId = line.substring(lastComma + 1); + String tagValues = "('" + location + "'," + groupId + ')'; + tbTags.put(tbName, tagValues); + } + if (bufferedCount == maxBatchSize) { + flush(); + } + } + + + /** + * Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it. + * In case of "Table does not exit" exception, create all tables in the sql and retry the sql. + */ + public void flush() throws SQLException { + StringBuilder sb = new StringBuilder("INSERT INTO "); + for (Map.Entry entry : tbValues.entrySet()) { + String tableName = entry.getKey(); + String values = entry.getValue(); + String q = tableName + " values " + values + " "; + if (sb.length() + q.length() > maxSQLLength) { + executeSQL(sb.toString()); + logger.warn("increase maxSQLLength or decrease maxBatchSize to gain better performance"); + sb = new StringBuilder("INSERT INTO "); + } + sb.append(q); + } + executeSQL(sb.toString()); + tbValues.clear(); + bufferedCount = 0; + } + + private void executeSQL(String sql) throws SQLException { + try { + stmt.executeUpdate(sql); + } catch (SQLException e) { + // convert to error code defined in taoserror.h + int errorCode = e.getErrorCode() & 0xffff; + if (errorCode == 0x362 || errorCode == 0x218) { + // Table does not exist + createTables(); + executeSQL(sql); + } else { + logger.error("Execute SQL: {}", sql); + throw e; + } + } catch (Throwable throwable) { + logger.error("Execute SQL: {}", sql); + throw throwable; + } + } + + /** + * Create tables in batch using syntax: + *

+ * CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...; + *

+ */ + private void createTables() throws SQLException { + StringBuilder sb = new StringBuilder("CREATE TABLE "); + for (String tbName : tbValues.keySet()) { + String tagValues = tbTags.get(tbName); + sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" "); + } + String sql = sb.toString(); + try { + stmt.executeUpdate(sql); + } catch (Throwable throwable) { + logger.error("Execute SQL: {}", sql); + throw throwable; + } + } + + public boolean hasBufferedValues() { + return bufferedCount > 0; + } + + public int getBufferedCount() { + return bufferedCount; + } + + public void close() { + try { + stmt.close(); + } catch (SQLException e) { + } + try { + conn.close(); + } catch (SQLException e) { + } + } +} \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/StmtWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/StmtWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..8ade06625d708a112c85d5657aa00bcd0e605ff4 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/StmtWriter.java @@ -0,0 +1,4 @@ +package com.taos.example.highvolume; + +public class StmtWriter { +} diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java new file mode 100644 index 0000000000000000000000000000000000000000..de9e5463d7dc59478f991e4783aacaae527b4c4b --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java @@ -0,0 +1,58 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; + +class WriteTask implements Runnable { + private final static Logger logger = LoggerFactory.getLogger(WriteTask.class); + private final int maxBatchSize; + + // the queue from which this writing task get raw data. + private final BlockingQueue queue; + + // A flag indicate whether to continue. + private boolean active = true; + + public WriteTask(BlockingQueue taskQueue, int maxBatchSize) { + this.queue = taskQueue; + this.maxBatchSize = maxBatchSize; + } + + @Override + public void run() { + logger.info("started"); + String line = null; // data getting from the queue just now. + SQLWriter writer = new SQLWriter(maxBatchSize); + try { + writer.init(); + while (active) { + line = queue.poll(); + if (line != null) { + // parse raw data and buffer the data. + writer.processLine(line); + } else if (writer.hasBufferedValues()) { + // write data immediately if no more data in the queue + writer.flush(); + } else { + // sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, . + Thread.sleep(100); + } + } + if (writer.hasBufferedValues()) { + writer.flush(); + } + } catch (Exception e) { + String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount()); + logger.error(msg, e); + } finally { + writer.close(); + } + } + + public void stop() { + logger.info("stop"); + this.active = false; + } +} \ No newline at end of file