未验证 提交 1baeab9e 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #14922 from taosdata/docs/TD-17370

docs: python and java examples
package com.taos.example.highvolume;
import java.sql.*;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
public class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public Long count() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
result.next();
return result.getLong(1);
}
return null;
}
}
\ No newline at end of file
...@@ -9,51 +9,7 @@ import java.util.List; ...@@ -9,51 +9,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public Long count() throws SQLException {
if (!stmt.isClosed()) {
ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
result.next();
return result.getLong(1);
}
return null;
}
}
// ANCHOR: main
public class FastWriteExample { public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
...@@ -110,5 +66,4 @@ public class FastWriteExample { ...@@ -110,5 +66,4 @@ public class FastWriteExample {
lastCount = count; lastCount = count;
} }
} }
} }
// ANCHOR_END: main \ No newline at end of file
\ No newline at end of file
package com.taos.example.highvolume;
import java.util.Iterator;
/**
* Generate test data
*/
class MockDataSource implements Iterator {
private String tbNamePrefix;
private int tableCount;
private long maxRowsPerTable = 1000000000L;
// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
// mock values
String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix, int tableCount) {
this.tbNamePrefix = tbNamePrefix;
this.tableCount = tableCount;
}
@Override
public boolean hasNext() {
currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
}
@Override
public String next() {
long ts = startMs + 100 * currentRow;
int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(location[currentRow % 5]).append(','); // location
sb.append(groupId); // groupID
return sb.toString();
}
}
\ No newline at end of file
...@@ -7,57 +7,6 @@ import java.util.Iterator; ...@@ -7,57 +7,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
/**
* Generate test data
*/
class MockDataSource implements Iterator {
private String tbNamePrefix;
private int tableCount;
private long maxRowsPerTable = 1000000000L;
// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
// mock values
String[] location = {"LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix, int tableCount) {
this.tbNamePrefix = tbNamePrefix;
this.tableCount = tableCount;
}
@Override
public boolean hasNext() {
currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
}
@Override
public String next() {
long ts = startMs + 100 * currentRow;
int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(location[currentRow % 5]).append(','); // location
sb.append(groupId); // groupID
return sb.toString();
}
}
// ANCHOR: ReadTask
class ReadTask implements Runnable { class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class); private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId; private final int taskId;
...@@ -106,6 +55,4 @@ class ReadTask implements Runnable { ...@@ -106,6 +55,4 @@ class ReadTask implements Runnable {
logger.info("stop"); logger.info("stop");
this.active = false; this.active = false;
} }
} }
\ No newline at end of file
// ANCHOR_END: ReadTask
\ No newline at end of file
...@@ -7,8 +7,6 @@ import java.sql.*; ...@@ -7,8 +7,6 @@ import java.sql.*;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
// ANCHOR: SQLWriter
/** /**
* A helper class encapsulate the logic of writing using SQL. * A helper class encapsulate the logic of writing using SQL.
* <p> * <p>
...@@ -154,10 +152,14 @@ public class SQLWriter { ...@@ -154,10 +152,14 @@ public class SQLWriter {
if (errorCode == 0x362 || errorCode == 0x218) { if (errorCode == 0x362 || errorCode == 0x218) {
// Table does not exist // Table does not exist
createTables(); createTables();
stmt.executeUpdate(sql); executeSQL(sql);
} else { } else {
logger.error("Execute SQL: {}", sql);
throw e; throw e;
} }
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
} }
} }
...@@ -174,7 +176,12 @@ public class SQLWriter { ...@@ -174,7 +176,12 @@ public class SQLWriter {
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" "); sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
} }
String sql = sb.toString(); String sql = sb.toString();
stmt.executeUpdate(sql); try {
stmt.executeUpdate(sql);
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
} }
public boolean hasBufferedValues() { public boolean hasBufferedValues() {
...@@ -195,5 +202,4 @@ public class SQLWriter { ...@@ -195,5 +202,4 @@ public class SQLWriter {
} catch (SQLException e) { } catch (SQLException e) {
} }
} }
} }
// ANCHOR_END: SQLWriter \ No newline at end of file
package com.taos.example.highvolume;
public class StmtWriter {
}
...@@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory; ...@@ -5,7 +5,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
// ANCHOR: WriteTask
class WriteTask implements Runnable { class WriteTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(WriteTask.class); private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final int maxBatchSize; private final int maxBatchSize;
...@@ -56,5 +55,4 @@ class WriteTask implements Runnable { ...@@ -56,5 +55,4 @@ class WriteTask implements Runnable {
logger.info("stop"); logger.info("stop");
this.active = false; this.active = false;
} }
} }
// ANCHOR_END: WriteTask \ No newline at end of file
\ No newline at end of file
<mxfile host="65bd71144e">
<diagram id="_BjMg4p5x31hL4Gv-87s" name="第 1 页">
<mxGraphModel dx="1259" dy="615" grid="0" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="900" pageHeight="900" background="none" math="0" shadow="0">
<root>
<mxCell id="0"/>
<mxCell id="1" parent="0"/>
<mxCell id="13" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontSize=17;" parent="1" source="7" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="260" y="150" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="7" value="&lt;font style=&quot;font-size: 14px&quot;&gt;Read Task 1&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="60" y="160" width="100" height="37" as="geometry"/>
</mxCell>
<mxCell id="18" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontSize=17;" parent="1" source="8" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="264" y="292" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="8" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Read Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 2&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="60" y="240" width="100" height="36" as="geometry"/>
</mxCell>
<mxCell id="23" value="" style="edgeStyle=none;html=1;fontSize=14;" parent="1" source="9" target="20" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="9" value="Queue 1" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="130" width="245" height="30" as="geometry"/>
</mxCell>
<mxCell id="24" value="" style="edgeStyle=none;html=1;fontSize=14;" parent="1" source="10" target="21" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="10" value="Queue 2" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="175" width="245" height="30" as="geometry"/>
</mxCell>
<mxCell id="25" value="" style="edgeStyle=none;html=1;fontSize=14;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="11" target="28" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="600" y="284.1428571428571" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="11" value="Queue 3" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="223" width="245" height="30" as="geometry"/>
</mxCell>
<mxCell id="14" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontSize=17;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="7" target="11" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="270" y="160" as="targetPoint"/>
<mxPoint x="180" y="195" as="sourcePoint"/>
</mxGeometry>
</mxCell>
<mxCell id="17" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.75;entryDx=0;entryDy=0;fontSize=17;" parent="1" source="8" target="10" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="180" y="285" as="sourcePoint"/>
<mxPoint x="275" y="162.5" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="20" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 1&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="588" y="129" width="100" height="31" as="geometry"/>
</mxCell>
<mxCell id="21" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 2&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="588" y="175" width="100" height="33" as="geometry"/>
</mxCell>
<mxCell id="30" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="26" target="29" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="26" value="Queue 4" style="rounded=0;whiteSpace=wrap;html=1;fontSize=17;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1">
<mxGeometry x="265" y="271" width="245" height="29" as="geometry"/>
</mxCell>
<mxCell id="28" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 3&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="588" y="221.5" width="100" height="33" as="geometry"/>
</mxCell>
<mxCell id="29" value="&lt;font style=&quot;font-size: 14px&quot;&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;Write Tas&lt;/font&gt;&lt;font style=&quot;font-size: 14px&quot;&gt;k 4&lt;/font&gt;&lt;/font&gt;" style="rounded=1;whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontStyle=1" parent="1" vertex="1">
<mxGeometry x="588" y="271" width="100" height="33" as="geometry"/>
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
\ No newline at end of file
...@@ -4,11 +4,13 @@ ...@@ -4,11 +4,13 @@
# #
import logging import logging
import math
import sys import sys
import time import time
import os import os
from multiprocessing import Process from multiprocessing import Process
from faster_fifo import Queue from faster_fifo import Queue
from mockdatasource import MockDataSource
from queue import Empty from queue import Empty
from typing import List from typing import List
...@@ -40,57 +42,18 @@ def get_connection(): ...@@ -40,57 +42,18 @@ def get_connection():
return taos.connect(host=host, port=int(port), user=user, password=password) return taos.connect(host=host, port=int(port), user=user, password=password)
# ANCHOR: MockDataSource
class MockDataSource:
location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"]
current = [8.8, 10.7, 9.9, 8.9, 9.4]
voltage = [119, 116, 111, 113, 118]
phase = [0.32, 0.34, 0.33, 0.329, 0.141]
max_rows_per_table = 10 ** 9
def __init__(self, tb_name_prefix, table_count):
self.table_name_prefix = tb_name_prefix
self.table_count = table_count
self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100
def __iter__(self):
self.row = 0
self.table_id = -1
return self
def __next__(self):
"""
next 100 rows of current table
"""
self.table_id += 1
if self.table_id == self.table_count:
self.table_id = 0
if self.row >= self.max_rows_per_table:
raise StopIteration
rows = []
while len(rows) < 100:
self.row += 1
ts = self.start_ms + 100 * self.row
group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1
tb_name = self.table_name_prefix + '_' + str(self.table_id)
ri = self.row % 5
rows.append(f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}")
return self.table_id, rows
# ANCHOR_END: MockDataSource
# ANCHOR: read # ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]): def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task) data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
try: try:
for table_id, rows in data_source: for batch in data_source:
# hash data to different queue for table_id, rows in batch:
i = table_id % len(task_queues) # hash data to different queue
# block putting forever when the queue is full i = table_id % len(task_queues)
task_queues[i].put_many(rows, block=True, timeout=-1) # block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
...@@ -141,7 +104,6 @@ def set_global_config(): ...@@ -141,7 +104,6 @@ def set_global_config():
# ANCHOR: monitor # ANCHOR: monitor
def run_monitor_process(): def run_monitor_process():
import taos
log = logging.getLogger("DataBaseMonitor") log = logging.getLogger("DataBaseMonitor")
conn = get_connection() conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test") conn.execute("DROP DATABASE IF EXISTS test")
...@@ -178,15 +140,18 @@ def main(): ...@@ -178,15 +140,18 @@ def main():
for i in range(WRITE_TASK_COUNT): for i in range(WRITE_TASK_COUNT):
queue = Queue(max_size_bytes=QUEUE_SIZE) queue = Queue(max_size_bytes=QUEUE_SIZE)
task_queues.append(queue) task_queues.append(queue)
# create write processes # create write processes
for i in range(WRITE_TASK_COUNT): for i in range(WRITE_TASK_COUNT):
p = Process(target=run_write_task, args=(i, task_queues[i])) p = Process(target=run_write_task, args=(i, task_queues[i]))
p.start() p.start()
logging.debug(f"WriteTask-{i} started with pid {p.pid}") logging.debug(f"WriteTask-{i} started with pid {p.pid}")
write_processes.append(p) write_processes.append(p)
# create read processes # create read processes
for i in range(READ_TASK_COUNT): for i in range(READ_TASK_COUNT):
p = Process(target=run_read_task, args=(i, task_queues)) queues = assign_queues(i, task_queues)
p = Process(target=run_read_task, args=(i, queues))
p.start() p.start()
logging.debug(f"ReadTask-{i} started with pid {p.pid}") logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes.append(p) read_processes.append(p)
...@@ -200,6 +165,16 @@ def main(): ...@@ -200,6 +165,16 @@ def main():
[q.close() for q in task_queues] [q.close() for q in task_queues]
def assign_queues(read_task_id, task_queues):
"""
Compute target queues for a specific read task.
"""
ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
from_index = math.floor(read_task_id * ratio)
end_index = math.ceil((read_task_id + 1) * ratio)
return task_queues[from_index:end_index]
if __name__ == '__main__': if __name__ == '__main__':
main() main()
# ANCHOR_END: main # ANCHOR_END: main
import time
class MockDataSource:
samples = [
"8.8,119,0.32,LosAngeles,0",
"10.7,116,0.34,SanDiego,1",
"9.9,111,0.33,Hollywood,2",
"8.9,113,0.329,Compton,3",
"9.4,118,0.141,San Francisco,4"
]
def __init__(self, tb_name_prefix, table_count):
self.table_name_prefix = tb_name_prefix + "_"
self.table_count = table_count
self.max_rows = 10000000
self.current_ts = round(time.time() * 1000) - self.max_rows * 100
# [(tableId, tableName, values),]
self.data = self._init_data()
def _init_data(self):
lines = self.samples * (self.table_count // 5 + 1)
data = []
for i in range(self.table_count):
table_name = self.table_name_prefix + str(i)
data.append((i, table_name, lines[i])) # tableId, row
return data
def __iter__(self):
self.row = 0
return self
def __next__(self):
"""
next 1000 rows for each table.
return: {tableId:[row,...]}
"""
# generate 1000 timestamps
ts = []
for _ in range(1000):
self.current_ts += 100
ts.append(str(self.current_ts))
# add timestamp to each row
# [(tableId, ["tableName,ts,current,voltage,phase,location,groupId"])]
result = []
for table_id, table_name, values in self.data:
rows = [table_name + ',' + t + ',' + values for t in ts]
result.append((table_id, rows))
return result
...@@ -18,6 +18,7 @@ class SQLWriter: ...@@ -18,6 +18,7 @@ class SQLWriter:
name = r[0] name = r[0]
if name == "maxSQLLength": if name == "maxSQLLength":
return int(r[1]) return int(r[1])
return 1024 * 1024
def process_lines(self, lines: str): def process_lines(self, lines: str):
""" """
...@@ -71,11 +72,19 @@ class SQLWriter: ...@@ -71,11 +72,19 @@ class SQLWriter:
if error_code == 0x362 or error_code == 0x218: if error_code == 0x362 or error_code == 0x218:
self.create_tables() self.create_tables()
else: else:
self.log.error("Execute SQL: %s", sql)
raise e raise e
except BaseException as baseException:
self.log.error("Execute SQL: %s", sql)
raise baseException
def create_tables(self): def create_tables(self):
sql = "CREATE TABLE " sql = "CREATE TABLE "
for tb in self._tb_values.keys(): for tb in self._tb_values.keys():
tag_values = self._tb_tags[tb] tag_values = self._tb_tags[tb]
sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " " sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
self._conn.execute(sql) try:
self._conn.execute(sql)
except BaseException as e:
self.log.error("Execute SQL: %s", sql)
raise e
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册