未验证 提交 294946a2 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #14924 from taosdata/docs/TD-17370-for-2.6

docs: update high-volume.md
package com.taos.example;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class TestTableNotExits {
private static Connection getConnection() throws SQLException {
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
return DriverManager.getConnection(jdbcUrl);
}
public static void main(String[] args) throws SQLException {
try(Connection conn = getConnection()) {
try(Statement stmt = conn.createStatement()) {
try {
stmt.executeUpdate("insert into test.t1 values(1, 2) test.t2 values(3, 4)");
} catch (SQLException e) {
System.out.println(e.getErrorCode());
System.out.println(Integer.toHexString(e.getErrorCode()));
System.out.println(e);
}
}
}
}
}
package com.taos.example.highvolume;
import java.sql.*;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
public class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public Long count() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
result.next();
return result.getLong(1);
}
return null;
}
}
\ No newline at end of file
...@@ -9,51 +9,7 @@ import java.util.List; ...@@ -9,51 +9,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public Long count() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
result.next();
return result.getLong(1);
}
return null;
}
}
// ANCHOR: main
public class FastWriteExample { public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
...@@ -111,4 +67,3 @@ public class FastWriteExample { ...@@ -111,4 +67,3 @@ public class FastWriteExample {
} }
} }
} }
\ No newline at end of file
// ANCHOR_END: main
\ No newline at end of file
package com.taos.example.highvolume;
import java.util.Iterator;
/**
* Generate test data
*/
class MockDataSource implements Iterator {
private String tbNamePrefix;
private int tableCount;
private long maxRowsPerTable = 1000000000L;
// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
// mock values
String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix, int tableCount) {
this.tbNamePrefix = tbNamePrefix;
this.tableCount = tableCount;
}
@Override
public boolean hasNext() {
currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
}
@Override
public String next() {
long ts = startMs + 100 * currentRow;
int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(location[currentRow % 5]).append(','); // location
sb.append(groupId); // groupID
return sb.toString();
}
}
\ No newline at end of file
...@@ -7,57 +7,6 @@ import java.util.Iterator; ...@@ -7,57 +7,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
/**
* Generate test data
*/
class MockDataSource implements Iterator {
private String tbNamePrefix;
private int tableCount;
private long maxRowsPerTable = 1000000000L;
// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
// mock values
String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix, int tableCount) {
this.tbNamePrefix = tbNamePrefix;
this.tableCount = tableCount;
}
@Override
public boolean hasNext() {
currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
}
@Override
public String next() {
long ts = startMs + 100 * currentRow;
int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(location[currentRow % 5]).append(','); // location
sb.append(groupId); // groupID
return sb.toString();
}
}
// ANCHOR: ReadTask
class ReadTask implements Runnable { class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class); private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId; private final int taskId;
...@@ -107,5 +56,3 @@ class ReadTask implements Runnable { ...@@ -107,5 +56,3 @@ class ReadTask implements Runnable {
this.active = false; this.active = false;
} }
} }
\ No newline at end of file
// ANCHOR_END: ReadTask
\ No newline at end of file
...@@ -7,8 +7,6 @@ import java.sql.*; ...@@ -7,8 +7,6 @@ import java.sql.*;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
// ANCHOR: SQLWriter
/** /**
* A helper class encapsulate the logic of writing using SQL. * A helper class encapsulate the logic of writing using SQL.
* <p> * <p>
...@@ -154,10 +152,14 @@ public class SQLWriter { ...@@ -154,10 +152,14 @@ public class SQLWriter {
if (errorCode == 0x362 || errorCode == 0x218) { if (errorCode == 0x362 || errorCode == 0x218) {
// Table does not exist // Table does not exist
createTables(); createTables();
stmt.executeUpdate(sql); executeSQL(sql);
} else { } else {
logger.error("Execute SQL: {}", sql);
throw e; throw e;
} }
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
} }
} }
...@@ -174,7 +176,12 @@ public class SQLWriter { ...@@ -174,7 +176,12 @@ public class SQLWriter {
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" "); sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
} }
String sql = sb.toString(); String sql = sb.toString();
try {
stmt.executeUpdate(sql); stmt.executeUpdate(sql);
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
} }
public boolean hasBufferedValues() { public boolean hasBufferedValues() {
...@@ -196,4 +203,3 @@ public class SQLWriter { ...@@ -196,4 +203,3 @@ public class SQLWriter {
} }
} }
} }
\ No newline at end of file
// ANCHOR_END: SQLWriter
package com.taos.example.highvolume;
public class StmtWriter {
}
...@@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory; ...@@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
// ANCHOR: WriteTask
class WriteTask implements Runnable { class WriteTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(WriteTask.class); private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final int maxBatchSize; private final int maxBatchSize;
...@@ -57,4 +56,3 @@ class WriteTask implements Runnable { ...@@ -57,4 +56,3 @@ class WriteTask implements Runnable {
this.active = false; this.active = false;
} }
} }
\ No newline at end of file
// ANCHOR_END: WriteTask
\ No newline at end of file
<mxfile host="65bd71144e">
<diagram id="_BjMg4p5x31hL4Gv-87s" name="第 1 页">
<mxGraphModel dx="862" dy="672" grid="0" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="900" pageHeight="900" background="none" math="0" shadow="0">
<root>
<mxCell id="0"/>
<mxCell id="1" parent="0"/>
<mxCell id="13" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontSize=17;" parent="1" source="7" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="260" y="150" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="7" value="&lt;font style=&quot;font-size: 14px&quot;&gt;Read Task 1&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="60" y="160" width="100" height="40" as="geometry"/>
</mxCell>
<mxCell id="16" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.75;entryDx=0;entryDy=0;fontSize=17;" parent="1" source="8" target="9" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="18" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;fontSize=17;" parent="1" source="8" target="11" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="8" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Read Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 2&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="60" y="240" width="100" height="40" as="geometry"/>
</mxCell>
<mxCell id="23" value="" style="edgeStyle=none;html=1;fontSize=14;" parent="1" source="9" target="20" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="9" value="Queue 1" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="130" width="245" height="30" as="geometry"/>
</mxCell>
<mxCell id="24" value="" style="edgeStyle=none;html=1;fontSize=14;" parent="1" source="10" target="21" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="10" value="Queue 2" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="200" width="245" height="30" as="geometry"/>
</mxCell>
<mxCell id="25" value="" style="edgeStyle=none;html=1;fontSize=14;" parent="1" source="11" target="22" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="11" value="Queue 3" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="280" width="245" height="30" as="geometry"/>
</mxCell>
<mxCell id="14" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontSize=17;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="7" target="10" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="270" y="160" as="targetPoint"/>
<mxPoint x="180" y="195" as="sourcePoint"/>
</mxGeometry>
</mxCell>
<mxCell id="15" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontSize=17;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="7" target="11" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="280" y="170" as="targetPoint"/>
<mxPoint x="190" y="205" as="sourcePoint"/>
</mxGeometry>
</mxCell>
<mxCell id="17" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=-0.033;entryY=0.65;entryDx=0;entryDy=0;fontSize=17;entryPerimeter=0;" parent="1" source="8" target="10" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="180" y="285" as="sourcePoint"/>
<mxPoint x="275" y="162.5" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="20" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 2&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="600" y="125" width="100" height="40" as="geometry"/>
</mxCell>
<mxCell id="21" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 2&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="600" y="195" width="100" height="40" as="geometry"/>
</mxCell>
<mxCell id="22" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 3&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="600" y="275" width="100" height="40" as="geometry"/>
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
\ No newline at end of file
<mxfile host="65bd71144e">
<diagram id="_BjMg4p5x31hL4Gv-87s" name="第 1 页">
<mxGraphModel dx="1259" dy="615" grid="0" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="900" pageHeight="900" background="none" math="0" shadow="0">
<root>
<mxCell id="0"/>
<mxCell id="1" parent="0"/>
<mxCell id="13" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontSize=17;" parent="1" source="7" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="260" y="150" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="7" value="&lt;font style=&quot;font-size: 14px&quot;&gt;Read Task 1&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="60" y="160" width="100" height="37" as="geometry"/>
</mxCell>
<mxCell id="18" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontSize=17;" parent="1" source="8" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="264" y="292" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="8" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Read Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 2&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="60" y="240" width="100" height="36" as="geometry"/>
</mxCell>
<mxCell id="23" value="" style="edgeStyle=none;html=1;fontSize=14;" parent="1" source="9" target="20" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="9" value="Queue 1" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="130" width="245" height="30" as="geometry"/>
</mxCell>
<mxCell id="24" value="" style="edgeStyle=none;html=1;fontSize=14;" parent="1" source="10" target="21" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="10" value="Queue 2" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="175" width="245" height="30" as="geometry"/>
</mxCell>
<mxCell id="25" value="" style="edgeStyle=none;html=1;fontSize=14;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="11" target="28" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="600" y="284.1428571428571" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="11" value="Queue 3" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="223" width="245" height="30" as="geometry"/>
</mxCell>
<mxCell id="14" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontSize=17;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="7" target="11" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="270" y="160" as="targetPoint"/>
<mxPoint x="180" y="195" as="sourcePoint"/>
</mxGeometry>
</mxCell>
<mxCell id="17" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.75;entryDx=0;entryDy=0;fontSize=17;" parent="1" source="8" target="10" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="180" y="285" as="sourcePoint"/>
<mxPoint x="275" y="162.5" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="20" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 1&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="588" y="129" width="100" height="31" as="geometry"/>
</mxCell>
<mxCell id="21" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 2&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="588" y="175" width="100" height="33" as="geometry"/>
</mxCell>
<mxCell id="30" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="26" target="29" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="26" value="Queue 4" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="271" width="245" height="29" as="geometry"/>
</mxCell>
<mxCell id="28" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 3&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="588" y="221.5" width="100" height="33" as="geometry"/>
</mxCell>
<mxCell id="29" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 4&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="588" y="271" width="100" height="33" as="geometry"/>
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
\ No newline at end of file
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
\ No newline at end of file
# install dependencies:
# recommend python >= 3.8
# pip3 install faster-fifo
#
import logging
import math
import sys
import time
import os
from multiprocessing import Process
from faster_fifo import Queue
from mockdatasource import MockDataSource
from queue import Empty
from typing import List
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
READ_TASK_COUNT = 1
WRITE_TASK_COUNT = 1
TABLE_COUNT = 1000
QUEUE_SIZE = 1000000
MAX_BATCH_SIZE = 3000
read_processes = []
write_processes = []
def get_connection():
"""
If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
"""
import taos
firstEP = os.environ.get("TDENGINE_FIRST_EP")
if firstEP:
host, port = firstEP.split(":")
else:
host, port = None, 0
user = os.environ.get("TDENGINE_USER", "root")
password = os.environ.get("TDENGINE_PASSWORD", "taosdata")
return taos.connect(host=host, port=int(port), user=user, password=password)
# ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
try:
for batch in data_source:
for table_id, rows in batch:
# hash data to different queue
i = table_id % len(task_queues)
# block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1)
except KeyboardInterrupt:
pass
# ANCHOR_END: read
# ANCHOR: write
def run_write_task(task_id: int, queue: Queue):
from sql_writer import SQLWriter
log = logging.getLogger(f"WriteTask-{task_id}")
writer = SQLWriter(get_connection)
lines = None
try:
while True:
try:
# get as many as possible
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
writer.process_lines(lines)
except Empty:
time.sleep(0.01)
except KeyboardInterrupt:
pass
except BaseException as e:
log.debug(f"lines={lines}")
raise e
# ANCHOR_END: write
def set_global_config():
argc = len(sys.argv)
if argc > 1:
global READ_TASK_COUNT
READ_TASK_COUNT = int(sys.argv[1])
if argc > 2:
global WRITE_TASK_COUNT
WRITE_TASK_COUNT = int(sys.argv[2])
if argc > 3:
global TABLE_COUNT
TABLE_COUNT = int(sys.argv[3])
if argc > 4:
global QUEUE_SIZE
QUEUE_SIZE = int(sys.argv[4])
if argc > 5:
global MAX_BATCH_SIZE
MAX_BATCH_SIZE = int(sys.argv[5])
# ANCHOR: monitor
def run_monitor_process():
log = logging.getLogger("DataBaseMonitor")
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
def get_count():
res = conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all()
return rows[0][0] if rows else 0
last_count = 0
while True:
time.sleep(10)
count = get_count()
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
# ANCHOR_END: monitor
# ANCHOR: main
def main():
set_global_config()
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
monitor_process = Process(target=run_monitor_process)
monitor_process.start()
time.sleep(3) # waiting for database ready.
task_queues: List[Queue] = []
# create task queues
for i in range(WRITE_TASK_COUNT):
queue = Queue(max_size_bytes=QUEUE_SIZE)
task_queues.append(queue)
# create write processes
for i in range(WRITE_TASK_COUNT):
p = Process(target=run_write_task, args=(i, task_queues[i]))
p.start()
logging.debug(f"WriteTask-{i} started with pid {p.pid}")
write_processes.append(p)
# create read processes
for i in range(READ_TASK_COUNT):
queues = assign_queues(i, task_queues)
p = Process(target=run_read_task, args=(i, queues))
p.start()
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes.append(p)
try:
monitor_process.join()
except KeyboardInterrupt:
monitor_process.terminate()
[p.terminate() for p in read_processes]
[p.terminate() for p in write_processes]
[q.close() for q in task_queues]
def assign_queues(read_task_id, task_queues):
"""
Compute target queues for a specific read task.
"""
ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
from_index = math.floor(read_task_id * ratio)
end_index = math.ceil((read_task_id + 1) * ratio)
return task_queues[from_index:end_index]
if __name__ == '__main__':
main()
# ANCHOR_END: main
import time
class MockDataSource:
samples = [
"8.8,119,0.32,LosAngeles,0",
"10.7,116,0.34,SanDiego,1",
"9.9,111,0.33,Hollywood,2",
"8.9,113,0.329,Compton,3",
"9.4,118,0.141,San Francisco,4"
]
def __init__(self, tb_name_prefix, table_count):
self.table_name_prefix = tb_name_prefix + "_"
self.table_count = table_count
self.max_rows = 10000000
self.current_ts = round(time.time() * 1000) - self.max_rows * 100
# [(tableId, tableName, values),]
self.data = self._init_data()
def _init_data(self):
lines = self.samples * (self.table_count // 5 + 1)
data = []
for i in range(self.table_count):
table_name = self.table_name_prefix + str(i)
data.append((i, table_name, lines[i])) # tableId, row
return data
def __iter__(self):
self.row = 0
return self
def __next__(self):
"""
next 1000 rows for each table.
return: {tableId:[row,...]}
"""
# generate 1000 timestamps
ts = []
for _ in range(1000):
self.current_ts += 100
ts.append(str(self.current_ts))
# add timestamp to each row
# [(tableId, ["tableName,ts,current,voltage,phase,location,groupId"])]
result = []
for table_id, table_name, values in self.data:
rows = [table_name + ',' + t + ',' + values for t in ts]
result.append((table_id, rows))
return result
...@@ -18,6 +18,7 @@ class SQLWriter: ...@@ -18,6 +18,7 @@ class SQLWriter:
name = r[0] name = r[0]
if name == "maxSQLLength": if name == "maxSQLLength":
return int(r[1]) return int(r[1])
return 1024 * 1024
def process_lines(self, lines: str): def process_lines(self, lines: str):
""" """
...@@ -71,11 +72,19 @@ class SQLWriter: ...@@ -71,11 +72,19 @@ class SQLWriter:
if error_code == 0x362 or error_code == 0x218: if error_code == 0x362 or error_code == 0x218:
self.create_tables() self.create_tables()
else: else:
self.log.error("Execute SQL: %s", sql)
raise e raise e
except BaseException as baseException:
self.log.error("Execute SQL: %s", sql)
raise baseException
def create_tables(self): def create_tables(self):
sql = "CREATE TABLE " sql = "CREATE TABLE "
for tb in self._tb_values.keys(): for tb in self._tb_values.keys():
tag_values = self._tb_tags[tb] tag_values = self._tb_tags[tb]
sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " " sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
try:
self._conn.execute(sql) self._conn.execute(sql)
except BaseException as e:
self.log.error("Execute SQL: %s", sql)
raise e
--- import Tabs from "@theme/Tabs";
title: 高效写入 import TabItem from "@theme/TabItem";
---
# 高效写入
本节介绍如何高效地向 TDengine 写入数据。
## 高效写入原理 {#principle} ## 高效写入原理 {#principle}
本节介绍如何高效地向 TDengine 写入数据。高效写入数据要考虑几个因素:数据在不同表(或子表)之间的分布,即要写入数据的相邻性;单次写入的数据量;并发连接数。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;每批次写入的数据量越大越高效(但超过一定阈值其优势会消失;同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。 ### 客户端程序的角度 {#application-view}
从客户端程序的角度来说,高效写入数据要考虑以下几个因素:
1. 单次写入的数据量。一般来讲,每批次写入的数据量越大越高效(但超过一定阈值其优势会消失)。使用 SQL 写入 TDengine 时,尽量在一条 SQL 中拼接更多数据。目前,TDengine 支持的一条 SQL 的最大长度为 1,048,576(1M)个字符。可通过配置客户端参数 maxSQLLength(默认值为 65480)进行修改。
2. 并发连接数。一般来讲,同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。
3. 数据在不同表(或子表)之间的分布,即要写入数据的相邻性。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;
4. 写入方式。一般来讲:
- 参数绑定写入比 SQL 写入更高效。因参数绑定方式避免了 SQL 解析。(但增加了 C 接口的调用次数,对于连接器也有性能损耗)。
- SQL 写入不自动建表比自动建表更高效。因自动建表要频繁检查表是否存在
- SQL 写入比无模式写入更高效。因无模式写入会自动建表且支持动态更改表结构
客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。
### 数据源的角度 {#datasource-view}
客户端程序通常需要从数据源读数据再写入 TDengine。从数据源角度来说,以下几种情况需要在读线程和写线程之间增加队列:
1. 有多个数据源,单个数据源生成数据的速度远小于单线程写入的速度,但数据量整体比较大。此时队列的作用是把多个数据源的数据汇聚到一起,增加单次写入的数据量。
2. 单个数据源生成数据的速度远大于单线程写入的速度。此时队列的作用是增加写入的并发度。
3. 单张表的数据分散在多个数据源。此时队列的作用是将同一张表的数据提前汇聚到一起,提高写入时数据的相邻性。
如果写应用的数据源是 Kafka, 写应用本身即 Kafka 的消费者,则可利用 Kafka 的特性实现高效写入。比如:
1. 将同一张表的数据写到同一个 Topic 的同一个 Partition,增加数据的相邻性
2. 通过订阅多个 Topic 实现数据汇聚
3. 通过增加 Consumer 线程数增加写入的并发度
4. 通过增加每次 fetch 的最大数据量来增加单次写入的最大数据量
### 服务器配置的角度 {#setting-view}
为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。 从服务器配置的角度来说,也有很多优化写入性能的方法。
为了使写入最高效,除了客户端程序的设计,服务端的配置也很重要。如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库有可能用到 10 个 vgroup。更多性能调优参数请参考[配置参考](../../reference/config)性能调优部分 如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库可能用到 10 个 vgroup
## 高效写入方案 {#scenario} 更多调优参数,请参考[性能优化](../../operation/optimize)[配置参考](../../reference/config)部分。
下面的示例程序展示了如何高效写入数据: ## 高效写入示例 {#sample-code}
- TDengine 客户端程序从消息队列或者其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源 ### 场景设计 {#scenario}
下面的示例程序展示了如何高效写入数据,场景设计如下:
- TDengine 客户端程序从其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源
- 单个连接向 TDengine 写入的速度无法与读数据的速度相匹配,因此客户端程序启动多个线程,每个线程都建立了与 TDengine 的连接,每个线程都有一个独占的固定大小的消息队列 - 单个连接向 TDengine 写入的速度无法与读数据的速度相匹配,因此客户端程序启动多个线程,每个线程都建立了与 TDengine 的连接,每个线程都有一个独占的固定大小的消息队列
- 客户端程序将接收到的数据根据所属的表名(或子表名)HASH 到不同的线程,即写入该线程所对应的消息队列,以此确保属于某个表(或子表)的数据一定会被一个固定的线程处理 - 客户端程序将接收到的数据根据所属的表名(或子表名)HASH 到不同的线程,即写入该线程所对应的消息队列,以此确保属于某个表(或子表)的数据一定会被一个固定的线程处理
- 各个子线程在将所关联的消息队列中的数据读空后或者读取数据量达到一个预定的阈值后将该批数据写入 TDengine,并继续处理后面接收到的数据 - 各个子线程在将所关联的消息队列中的数据读空后或者读取数据量达到一个预定的阈值后将该批数据写入 TDengine,并继续处理后面接收到的数据
![TDengine 高效写入线程模型](highvolume.webp) ![TDengine 高效写入示例场景的线程模型](highvolume.webp)
### 示例代码 {#code}
这一部分是针对以上场景的示例代码。对于其它场景高效写入原理相同,不过代码需要适当修改。
本示例代码假设源数据属于同一张超级表(meters)的不同子表。程序在开始写入数据之前已经在 test 库创建了这个超级表。对于子表,将根据收到的数据,由应用程序自动创建。如果实际场景是多个超级表,只需修改写任务自动建表的代码。
:::note <Tabs defaultValue="java" groupId="lang">
上图所示架构,每个写任务只负责写特定的表,体现了数据的相邻性原则。但是读任务所读的表,我们假设是随机的。这样一个队列有多个写入线程(或进程),队列内部可能产生锁的消耗。实际场景,如果能做到一个读任务对应一个写任务是最好的。 <TabItem label="Java" value="java">
:::
## Java 示例程序 {#java-demo} **程序清单**
在 Java 示例程序中采用拼接 SQL 的写入方式。 | 类名 | 功能说明 |
| ---------------- | --------------------------------------------------------------------------- |
| FastWriteExample | 主程序 |
| ReadTask | 从模拟源中读取数据,将表名经过 hash 后得到 Queue 的 index,写入对应的 Queue |
| WriteTask | 从 Queue 中获取数据,组成一个 Batch,写入 TDengine |
| MockDataSource | 模拟生成一定数量 meters 子表的数据 |
| SQLWriter | WriteTask 依赖这个类完成 SQL 拼接、自动建表、 SQL 写入、SQL 长度检查 |
| StmtWriter | 实现参数绑定方式批量写入(暂未完成) |
| DataBaseMonitor | 统计写入速度,并每隔 10 秒把当前写入速度打印到控制台 |
### 主程序 {#java-demo-main}
以下是各类的完整代码和更详细的功能说明。
<details>
<summary>FastWriteExample</summary>
主程序负责: 主程序负责:
1. 创建消息队列 1. 创建消息队列
...@@ -45,70 +97,80 @@ title: 高效写入 ...@@ -45,70 +97,80 @@ title: 高效写入
3. 模拟生成的总表数。默认为 1000。将会平分给各个读线程。 3. 模拟生成的总表数。默认为 1000。将会平分给各个读线程。
4. 每批最多写入记录数量。默认为 3000。 4. 每批最多写入记录数量。默认为 3000。
<details> 队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。 示例程序默认值已经设置地足够大。
<summary>主程序</summary>
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java}}
``` ```
</details> </details>
<details>
队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。 <summary>ReadTask</summary>
### 读任务的实现 {#java-demo-read}
读任务负责从数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成一点数量表的数据。不同的模拟数据源生成不同表的数据。 读任务负责从数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成一点数量表的数据。不同的模拟数据源生成不同表的数据。
读任务采用阻塞的方式写消息队列。也就是说,一旦队列满了,写操作就会阻塞。 读任务采用阻塞的方式写消息队列。也就是说,一旦队列满了,写操作就会阻塞。
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java}}
```
</details>
<details> <details>
<summary>读任务的实现</summary> <summary>WriteTask</summary>
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java:ReadTask}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java}}
``` ```
</details> </details>
### 写任务的实现 {#java-demo-write}
<details> <details>
<summary>写任务的实现</summary>
<summary>MockDataSource</summary>
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java:WriteTask}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java}}
``` ```
</details> </details>
### SQLWriter 类的实现 {#java-demo-sql-writer} <details>
SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。 <summary>SQLWriter</summary>
SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是在 catch 到表不存在异常的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。对于其它异常,这里简单地记录当时执行的 SQL 语句到日志中,你也可以记录更多线索到日志,已便排查错误和故障恢复。
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java}}
```
</details>
<details> <details>
<summary>SQLWriter 类的实现</summary>
<summary>DataBaseMonitor</summary>
```java ```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java:SQLWriter}} {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java}}
``` ```
</details> </details>
### 执行示例程序 {#run-java-demo} **执行步骤**
<details> <details>
<summary>执行 Java 示例程序</summary> <summary>执行 Java 示例程序</summary>
执行程序前需配置环境变量 `TDENGINE_JDBC_URL`。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置: 执行程序前需配置环境变量 `TDENGINE_JDBC_URL`。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置:
``` ```
TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
``` ```
#### 本地集成开发环境执行示例程序 {#java-demo-local-run} **本地集成开发环境执行示例程序**
1. clone TDengine 仓库 1. clone TDengine 仓库
``` ```
...@@ -118,7 +180,7 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" ...@@ -118,7 +180,7 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
3. 在开发环境中配置环境变量 `TDENGINE_JDBC_URL`。如果已配置了全局的环境变量 `TDENGINE_JDBC_URL` 可跳过这一步。 3. 在开发环境中配置环境变量 `TDENGINE_JDBC_URL`。如果已配置了全局的环境变量 `TDENGINE_JDBC_URL` 可跳过这一步。
4. 运行类 `com.taos.example.highvolume.FastWriteExample` 4. 运行类 `com.taos.example.highvolume.FastWriteExample`
#### 远程服务器上执行示例程序 {#java-demo-remote-run} **远程服务器上执行示例程序**
若要在服务器上执行示例程序,可按照下面的步骤操作: 若要在服务器上执行示例程序,可按照下面的步骤操作:
...@@ -135,16 +197,17 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" ...@@ -135,16 +197,17 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
``` ```
scp -r .\target\lib <user>@<host>:~/examples/java scp -r .\target\lib <user>@<host>:~/examples/java
``` ```
- 复制本程序的 jar 包,每次更新代码都需要复制 - 复制本程序的 jar 包,每次更新代码都需要复制
``` ```
scp -r .\target\javaexample-1.0.jar <user>@<host>:~/examples/java scp -r .\target\javaexample-1.0.jar <user>@<host>:~/examples/java
``` ```
4. 配置环境变量。 4. 配置环境变量。
编辑 `~/.bash_profile``~/.bashrc` 添加如下内容例如: 编辑 `~/.bash_profile``~/.bashrc` 添加如下内容例如:
``` ```
export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
``` ```
以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。 以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。
5. 用 java 命令启动示例程序,命令模板: 5. 用 java 命令启动示例程序,命令模板:
...@@ -154,39 +217,70 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" ...@@ -154,39 +217,70 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
``` ```
6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 <kbd>CTRL</kbd> + <kbd>C</kbd> 结束程序。 6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 <kbd>CTRL</kbd> + <kbd>C</kbd> 结束程序。
下面是一次实际运行的截图: 下面是一次实际运行的日志输出,机器配置 16核 + 64G + 固态硬盘。
``` ```
[testuser@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000 root@vm85$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 2 12
17:01:01.131 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=1, writeTaskCount=9 tableCount=1000 maxBatchSize=2000 18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000
17:01:01.286 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started 18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.354 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started 18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
17:01:01.360 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started 18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.366 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started 18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
17:01:01.433 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started 18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.438 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started 18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
17:01:01.443 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started 18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.448 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started 18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
17:01:01.454 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started 18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.454 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started 18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
17:01:11.615 [main] INFO c.t.e.highvolume.FastWriteExample - count=18766442 speed=1876644 18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
17:01:21.775 [main] INFO c.t.e.highvolume.FastWriteExample - count=38947464 speed=2018102 18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
17:01:32.428 [main] INFO c.t.e.highvolume.FastWriteExample - count=58649571 speed=1970210 18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
17:01:42.577 [main] INFO c.t.e.highvolume.FastWriteExample - count=79264890 speed=2061531 18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
17:01:53.265 [main] INFO c.t.e.highvolume.FastWriteExample - count=99097476 speed=1983258 18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
17:02:04.209 [main] INFO c.t.e.highvolume.FastWriteExample - count=119546779 speed=2044930 18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
17:02:14.935 [main] INFO c.t.e.highvolume.FastWriteExample - count=141078914 speed=2153213 18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
17:02:25.617 [main] INFO c.t.e.highvolume.FastWriteExample - count=162183457 speed=2110454 18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
17:02:36.718 [main] INFO c.t.e.highvolume.FastWriteExample - count=182735614 speed=2055215 18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started
17:02:46.988 [main] INFO c.t.e.highvolume.FastWriteExample - count=202895614 speed=2016000 18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started
18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444
18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521
18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394
18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933
18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696
18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729
18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521
18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788
18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950
``` ```
</details> </details>
## Python 示例程序 {#python-demo} </TabItem>
<TabItem label="Python" value="python">
**程序清单**
Python 示例程序中采用了多进程的架构,并使用了跨进程的消息队列。
该 Python 示例程序中采用了多进程的架构,并使用了跨进程的队列通信。写任务采用拼装 SQL 的方式写入。 | 函数或类 | 功能说明 |
### main 函数 {#python-demo-main} | ------------------------ | -------------------------------------------------------------------- |
| main 函数 | 程序入口, 创建各个子进程和消息队列 |
| run_monitor_process 函数 | 创建数据库,超级表,统计写入速度并定时打印到控制台 |
| run_read_task 函数 | 读进程主要逻辑,负责从其它数据系统读数据,并分发数据到为之分配的队列 |
| MockDataSource 类 | 模拟数据源, 实现迭代器接口,每次批量返回每张表的接下来 1000 条数据 |
| run_write_task 函数 | 写进程主要逻辑。每次从队列中取出尽量多的数据,并批量写入 |
| SQLWriter类 | SQL 写入和自动建表 |
| StmtWriter 类 | 实现参数绑定方式批量写入(暂未完成) |
<details>
<summary>main 函数</summary>
main 函数负责创建消息队列和启动子进程,子进程有 3 类: main 函数负责创建消息队列和启动子进程,子进程有 3 类:
...@@ -202,76 +296,62 @@ main 函数可以接收 5 个启动参数,依次是: ...@@ -202,76 +296,62 @@ main 函数可以接收 5 个启动参数,依次是:
4. 队列大小(单位字节),默认为 1000000 4. 队列大小(单位字节),默认为 1000000
5. 每批最多写入记录数量, 默认为 3000 5. 每批最多写入记录数量, 默认为 3000
<details>
<summary>main 函数</summary>
```python ```python
{{#include docs/examples/python/highvolume_faster_queue.py:main}} {{#include docs/examples/python/fast_write_example.py:main}}
``` ```
</details> </details>
### 监控进程 <details>
<summary>run_monitor_process</summary>
监控进程负责初始化数据库,并监控当前的写入速度。 监控进程负责初始化数据库,并监控当前的写入速度。
<details>
<summary>Monitor Process</summary>
```python ```python
{{#include docs/examples/python/highvolume_faster_queue.py:monitor}} {{#include docs/examples/python/fast_write_example.py:monitor}}
``` ```
</details> </details>
### 读进程 {#python-read-process}
#### 读进程主要逻辑 {#python-run-read-task}
读进程,负责从其它数据系统读数据,并分发数据到各个写进程。
<details> <details>
<summary>run_read_task 函数</summary> <summary>run_read_task 函数</summary>
读进程,负责从其它数据系统读数据,并分发数据到为之分配的队列。
```python ```python
{{#include docs/examples/python/highvolume_faster_queue.py:read}} {{#include docs/examples/python/fast_write_example.py:read}}
``` ```
</details> </details>
#### 模拟数据源 {#python-mock-data-source}
以下是模拟数据源的实现,我们假设数据源生成的每一条数据都带有目标表名信息。实际中你可能需要一定的规则确定目标表名。
<details> <details>
<summary>MockDataSource</summary> <summary>MockDataSource</summary>
以下是模拟数据源的实现,我们假设数据源生成的每一条数据都带有目标表名信息。实际中你可能需要一定的规则确定目标表名。
```python ```python
{{#include docs/examples/python/highvolume_faster_queue.py:MockDataSource}} {{#include docs/examples/python/mockdatasource.py}}
``` ```
</details> </details>
### 写进程 {#python-write-process}
写进程每次从队列中取出尽量多的数据,并批量写入。
<details> <details>
<summary>run_write_task 函数</summary> <summary>run_write_task 函数</summary>
写进程每次从队列中取出尽量多的数据,并批量写入。
```python ```python
{{#include docs/examples/python/highvolume_faster_queue.py:write}} {{#include docs/examples/python/fast_write_example.py:write}}
``` ```
</details>
### SQLWriter 类的实现 {#python-sql-writer} </details>
SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。这个类也对 SQL 是否超过最大长度限制做了检查,如果接近 SQL 最大长度限制(maxSQLLength),将会立即执行 SQL。为了减少 SQL 执行次数,建议将 maxSQLLength 适当调大。
<details> <details>
SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提前创建,而是在发生表不存在错误的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。对于其它错误会记录当时执行的 SQL, 以便排查错误和故障恢复。这个类也对 SQL 是否超过最大长度限制做了检查,如果接近 SQL 最大长度限制(maxSQLLength),将会立即执行 SQL。为了减少 SQL 此时,建议将 maxSQLLength 适当调大。
<summary>SQLWriter</summary> <summary>SQLWriter</summary>
```python ```python
...@@ -280,62 +360,73 @@ SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提 ...@@ -280,62 +360,73 @@ SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提
</details> </details>
### 执行示例程序 {#run-python-demo} **执行步骤**
<details> <details>
<summary>执行 Python 示例程序</summary> <summary>执行 Python 示例程序</summary>
1. 前提条件 1. 前提条件
- 已安装 TDengine 客户端驱动 - 已安装 TDengine 客户端驱动
- 已安装 Python3, 推荐版本 >= 3.8 - 已安装 Python3, 推荐版本 >= 3.8
- 已安装 taospy - 已安装 taospy
2. 安装 faster-fifo 代替 python 内置的 multiprocessing.Queue 2. 安装 faster-fifo 代替 python 内置的 multiprocessing.Queue
``` ```
pip3 install faster-fifo pip3 install faster-fifo
``` ```
3. 点击上面的“查看源码”链接复制 `highvolume_faster_queue.py``sql_writer.py`个文件。 3. 点击上面的“查看源码”链接复制 `fast_write_example.py``sql_writer.py``mockdatasource.py`个文件。
4. 执行示例程序 4. 执行示例程序
``` ```
python3 highvolume_faster_queue.py <READ_TASK_COUNT> <WRITE_TASK_COUNT> <TABLE_COUNT> <QUEUE_SIZE> <MAX_BATCH_SIZE> python3 fast_write_example.py <READ_TASK_COUNT> <WRITE_TASK_COUNT> <TABLE_COUNT> <QUEUE_SIZE> <MAX_BATCH_SIZE>
``` ```
下面是一次实际运行的输出: 下面是一次实际运行的输出, 机器配置 16核 + 64G + 固态硬盘。
``` ```
[testuser@vm95 python]$ python3.6 highvolume_faster_queue.py 9 9 1000 5000000 3000 root@vm85$ python3 fast_write_example.py 8 8
2022-07-13 10:05:50,504 [root] - READ_TASK_COUNT=9, WRITE_TASK_COUNT=9, TABLE_COUNT=1000, QUEUE_SIZE=5000000, MAX_BATCH_SIZE=3000 2022-07-14 19:13:45,869 [root] - READ_TASK_COUNT=8, WRITE_TASK_COUNT=8, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000
2022-07-13 10:05:53,542 [root] - WriteTask-0 started with pid 5475 2022-07-14 19:13:48,882 [root] - WriteTask-0 started with pid 718347
2022-07-13 10:05:53,542 [root] - WriteTask-1 started with pid 5476 2022-07-14 19:13:48,883 [root] - WriteTask-1 started with pid 718348
2022-07-13 10:05:53,543 [root] - WriteTask-2 started with pid 5477 2022-07-14 19:13:48,884 [root] - WriteTask-2 started with pid 718349
2022-07-13 10:05:53,543 [root] - WriteTask-3 started with pid 5478 2022-07-14 19:13:48,884 [root] - WriteTask-3 started with pid 718350
2022-07-13 10:05:53,544 [root] - WriteTask-4 started with pid 5479 2022-07-14 19:13:48,885 [root] - WriteTask-4 started with pid 718351
2022-07-13 10:05:53,544 [root] - WriteTask-5 started with pid 5480 2022-07-14 19:13:48,885 [root] - WriteTask-5 started with pid 718352
2022-07-13 10:05:53,545 [root] - WriteTask-6 started with pid 5481 2022-07-14 19:13:48,886 [root] - WriteTask-6 started with pid 718353
2022-07-13 10:05:53,546 [root] - WriteTask-7 started with pid 5482 2022-07-14 19:13:48,886 [root] - WriteTask-7 started with pid 718354
2022-07-13 10:05:53,546 [root] - WriteTask-8 started with pid 5483 2022-07-14 19:13:48,887 [root] - ReadTask-0 started with pid 718355
2022-07-13 10:05:53,547 [root] - ReadTask-0 started with pid 5484 2022-07-14 19:13:48,888 [root] - ReadTask-1 started with pid 718356
2022-07-13 10:05:53,548 [root] - ReadTask-1 started with pid 5485 2022-07-14 19:13:48,889 [root] - ReadTask-2 started with pid 718357
2022-07-13 10:05:53,549 [root] - ReadTask-2 started with pid 5486 2022-07-14 19:13:48,889 [root] - ReadTask-3 started with pid 718358
2022-07-13 10:05:53,550 [root] - ReadTask-3 started with pid 5487 2022-07-14 19:13:48,890 [root] - ReadTask-4 started with pid 718359
2022-07-13 10:05:53,551 [root] - ReadTask-4 started with pid 5488 2022-07-14 19:13:48,891 [root] - ReadTask-5 started with pid 718361
2022-07-13 10:05:53,552 [root] - ReadTask-5 started with pid 5489 2022-07-14 19:13:48,892 [root] - ReadTask-6 started with pid 718364
2022-07-13 10:05:53,552 [root] - ReadTask-6 started with pid 5490 2022-07-14 19:13:48,893 [root] - ReadTask-7 started with pid 718365
2022-07-13 10:05:53,553 [root] - ReadTask-7 started with pid 5491 2022-07-14 19:13:56,042 [DataBaseMonitor] - count=6676310 speed=667631.0
2022-07-13 10:05:53,554 [root] - ReadTask-8 started with pid 5492 2022-07-14 19:14:06,196 [DataBaseMonitor] - count=20004310 speed=1332800.0
2022-07-13 10:06:00,842 [DataBaseMonitor] - count=6612939 speed=661293.9 2022-07-14 19:14:16,366 [DataBaseMonitor] - count=32290310 speed=1228600.0
2022-07-13 10:06:11,151 [DataBaseMonitor] - count=14765739 speed=815280.0 2022-07-14 19:14:26,527 [DataBaseMonitor] - count=44438310 speed=1214800.0
2022-07-13 10:06:21,677 [DataBaseMonitor] - count=23282163 speed=851642.4 2022-07-14 19:14:36,673 [DataBaseMonitor] - count=56608310 speed=1217000.0
2022-07-13 10:06:31,985 [DataBaseMonitor] - count=31673139 speed=839097.6 2022-07-14 19:14:46,834 [DataBaseMonitor] - count=68757310 speed=1214900.0
2022-07-13 10:06:42,343 [DataBaseMonitor] - count=39819439 speed=814630.0 2022-07-14 19:14:57,280 [DataBaseMonitor] - count=80992310 speed=1223500.0
2022-07-13 10:06:52,830 [DataBaseMonitor] - count=48146339 speed=832690.0 2022-07-14 19:15:07,689 [DataBaseMonitor] - count=93805310 speed=1281300.0
2022-07-13 10:07:03,396 [DataBaseMonitor] - count=56385039 speed=823870.0 2022-07-14 19:15:18,020 [DataBaseMonitor] - count=106111310 speed=1230600.0
2022-07-13 10:07:14,341 [DataBaseMonitor] - count=64848739 speed=846370.0 2022-07-14 19:15:28,356 [DataBaseMonitor] - count=118394310 speed=1228300.0
2022-07-13 10:07:24,877 [DataBaseMonitor] - count=73654566 speed=880582.7 2022-07-14 19:15:38,690 [DataBaseMonitor] - count=130742310 speed=1234800.0
``` 2022-07-14 19:15:49,000 [DataBaseMonitor] - count=143051310 speed=1230900.0
2022-07-14 19:15:59,323 [DataBaseMonitor] - count=155276310 speed=1222500.0
2022-07-14 19:16:09,649 [DataBaseMonitor] - count=167603310 speed=1232700.0
2022-07-14 19:16:19,995 [DataBaseMonitor] - count=179976310 speed=1237300.0
```
</details> </details>
</TabItem>
</Tabs>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册