diff --git a/docs/en/07-develop/03-insert-data/05-high-volume.md b/docs/en/07-develop/03-insert-data/05-high-volume.md new file mode 100644 index 0000000000000000000000000000000000000000..1a4813f74e680905206b5bdd8fe37cd4eca2b0be --- /dev/null +++ b/docs/en/07-develop/03-insert-data/05-high-volume.md @@ -0,0 +1,444 @@ +--- +sidebar_label: High Performance Writing +title: High Performance Writing +--- + +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; + +This chapter introduces how to write data into TDengine with high throughput. + +## How to achieve high performance data writing + +To achieve high performance writing, there are a few aspects to consider. In the following sections we will describe these important factors in achieving high performance writing. + +### Application Program + +From the perspective of application program, you need to consider: + +1. The data size of each single write, also known as batch size. Generally speaking, higher batch size generates better writing performance. However, once the batch size is over a specific value, you will not get any additional benefit anymore. When using SQL to write into TDengine, it's better to put as much as possible data in single SQL. The maximum SQL length supported by TDengine is 1,048,576 bytes, i.e. 1 MB. It can be configured by parameter `maxSQLLength` on client side, and the default value is 65,480. + +2. The number of concurrent connections. Normally more connections can get better result. However, once the number of connections exceeds the processing ability of the server side, the performance may downgrade. + +3. The distribution of data to be written across tables or sub-tables. Writing to single table in one batch is more efficient than writing to multiple tables in one batch. + +4. Data Writing Protocol. + - Prameter binding mode is more efficient than SQL because it doesn't have the cost of parsing SQL. + - Writing to known existing tables is more efficient than wirting to uncertain tables in automatic creating mode because the later needs to check whether the table exists or not before actually writing data into it + - Writing in SQL is more efficient than writing in schemaless mode because schemaless writing creats table automatically and may alter table schema + +Application programs need to take care of the above factors and try to take advantage of them. The application progam should write to single table in each write batch. The batch size needs to be tuned to a proper value on a specific system. The number of concurrent connections needs to be tuned to a proper value too to achieve the best writing throughput. + +### Data Source + +Application programs need to read data from data source then write into TDengine. If you meet one or more of below situations, you need to setup message queues between the threads for reading from data source and the threads for writing into TDengine. + +1. There are multiple data sources, the data generation speed of each data source is much slower than the speed of single writing thread. In this case, the purpose of message queues is to consolidate the data from multiple data sources together to increase the batch size of single write. +2. The speed of data generation from single data source is much higher than the speed of single writing thread. The purpose of message queue in this case is to provide buffer so that data is not lost and multiple writing threads can get data from the buffer. +3. The data for single table are from multiple data source. In this case the purpose of message queues is to combine the data for single table together to improve the write efficiency. + +If the data source is Kafka, then the appication program is a consumer of Kafka, you can benefit from some kafka features to achieve high performance writing: + +1. Put the data for a table in single partition of single topic so that it's easier to put the data for each table together and write in batch +2. Subscribe multiple topics to accumulate data together. +3. Add more consumers to gain more concurrency and throughput. +4. Incrase the size of single fetch to increase the size of write batch. + +### Tune TDengine + +TDengine is a distributed and high performance time series database, there are also some ways to tune TDengine to get better writing performance. + +1. Set proper number of `vgroups` according to available CPU cores. Normally, we recommend 2 \* number_of_cores as a starting point. If the verification result shows this is not enough to utilize CPU resources, you can use a higher value. +2. Set proper `minTablesPerVnode`, `tableIncStepPerVnode`, and `maxVgroupsPerDb` according to the number of tables so that tables are distributed even across vgroups. The purpose is to balance the workload among all vnodes so that system resources can be utilized better to get higher performance. + +For more performance tuning tips, please refer to [Performance Optimization](../../../operation/optimize) and [Configuration Parameters](../../../reference/config). + +## Sample Programs + +This section will introduce the sample programs to demonstrate how to write into TDengine with high performance. + +### Scenario + +Below are the scenario for the sample programs of high performance wrting. + +- Application program reads data from data source, the sample program simulates a data source by generating data +- The speed of single writing thread is much slower than the speed of generating data, so the program starts multiple writing threads while each thread establish a connection to TDengine and each thread has a message queue of fixed size. +- Application program maps the received data to different writing threads based on table name to make sure all the data for each table is always processed by a specific writing thread. +- Each writing thread writes the received data into TDengine once the message queue becomes empty or the read data meets a threshold. + +![Thread Model of High Performance Writing into TDengine](highvolume.webp) + +### Sample Programs + +The sample programs listed in this section are based on the scenario described previously. If your scenarios is different, please try to adjust the code based on the principles described in this chapter. + +The sample programs assume the source data is for all the different sub tables in same super table (meters). The super table has been created before the sample program starts to writing data. Sub tables are created automatically according to received data. If there are multiple super tables in your case, please try to adjust the part of creating table automatically. + + + + +**Program Inventory** + +| Class | Description | +| ---------------- | ----------------------------------------------------------------------------------------------------- | +| FastWriteExample | Main Program | +| ReadTask | Read data from simulated data source and put into a queue according to the hash value of table name | +| WriteTask | Read data from Queue, compose a wirte batch and write into TDengine | +| MockDataSource | Generate data for some sub tables of super table meters | +| SQLWriter | WriteTask uses this class to compose SQL, create table automatically, check SQL length and write data | +| StmtWriter | Write in Parameter binding mode (Not finished yet) | +| DataBaseMonitor | Calculate the writing speed and output on console every 10 seconds | + +Below is the list of complete code of the classes in above table and more detailed description. + +
+FastWriteExample +The main Program is responsible for: + +1. Create message queues +2. Start writing threads +3. Start reading threads +4. Otuput writing speed every 10 seconds + +The main program provides 4 parameters for tuning: + +1. The number of reading threads, default value is 1 +2. The number of writing threads, default alue is 2 +3. The total number of tables in the generated data, default value is 1000. These tables are distributed evenly across all writing threads. If the number of tables is very big, it will cost much time to firstly create these tables. +4. The batch size of single write, default value is 3,000 + +The capacity of message queue also impacts performance and can be tuned by modifying program. Normally it's always better to have a larger message queue. A larger message queue means lower possibility of being blocked when enqueueing and higher throughput. But a larger message queue consumes more memory space. The default value used in the sample programs is already big enoug. + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java}} +``` + +
+ +
+ReadTask + +ReadTask reads data from data source. Each ReadTask is associated with a simulated data source, each data source generates data for a group of specific tables, and the data of any table is only generated from a single specific data source. + +ReadTask puts data in message queue in blocking mode. That means, the putting operation is blocked if the message queue is full. + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java}} +``` + +
+ +
+WriteTask + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java}} +``` + +
+ +
+ +MockDataSource + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java}} +``` + +
+ +
+ +SQLWriter + +SQLWriter class encapsulates the logic of composing SQL and writing data. Please be noted that the tables have not been created before writing, but are created automatically when catching the exception of table doesn't exist. For other exceptions caught, the SQL which caused the exception are logged for you to debug. + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java}} +``` + +
+ +
+ +DataBaseMonitor + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java}} +``` + +
+ +**Steps to Launch** + +
+Launch Java Sample Program + +You need to set environment variable `TDENGINE_JDBC_URL` before launching the program. If TDengine Server is setup on localhost, then the default value for user name, password and port can be used, like below: + +``` +TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" +``` + +**Launch in IDE** + +1. Clone TDengine repolitory + ``` + git clone git@github.com:taosdata/TDengine.git --depth 1 + ``` +2. Use IDE to open `docs/examples/java` directory +3. Configure environment variable `TDENGINE_JDBC_URL`, you can also configure it before launching the IDE, if so you can skip this step. +4. Run class `com.taos.example.highvolume.FastWriteExample` + +**Launch on server** + +If you want to launch the sample program on a remote server, please follow below steps: + +1. Package the sample programs. Execute below command under directory `TDengine/docs/examples/java` : + ``` + mvn package + ``` +2. Create `examples/java` directory on the server + ``` + mkdir -p examples/java + ``` +3. Copy dependencies (below commands assume you are working on a local Windows host and try to launch on a remote Linux host) + - Copy dependent packages + ``` + scp -r .\target\lib @:~/examples/java + ``` + - Copy the jar of sample programs + ``` + scp -r .\target\javaexample-1.0.jar @:~/examples/java + ``` +4. Configure environment variable + Edit `~/.bash_profile` or `~/.bashrc` and add below: + + ``` + export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" + ``` + + If your TDengine server is not deployed on localhost or doesn't use default port, you need to change the above URL to correct value in your environment. + +5. Launch the sample program + + ``` + java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample + ``` + +6. The sample program doesn't exit unless you press CTRL + C to terminate it. + Below is the output of running on a server of 16 cores, 64GB memory and SSD hard disk. + + ``` + root@vm85$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 2 12 + 18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000 + 18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started + 18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started + 18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444 + 18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521 + 18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394 + 18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933 + 18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696 + 18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729 + 18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521 + 18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788 + 18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950 + ``` + +
+ +
+ + +**Program Inventory** + +Sample programs in Python uses multi-process and cross-process message queues. + +| Function/CLass | Description | +| ---------------------------- | --------------------------------------------------------------------------- | +| main Function | Program entry point, create child processes and message queues | +| run_monitor_process Function | Create database, super table, calculate writing speed and output to console | +| run_read_task Function | Read data and distribute to message queues | +| MockDataSource Class | Simulate data source, return next 1,000 rows of each table | +| run_write_task Function | Read as much as possible data from message queue and write in batch | +| SQLWriter Class | Write in SQL and create table utomatically | +| StmtWriter Class | Write in parameter binding mode (not finished yet) | + +
+main function + +`main` function is responsible for creating message queues and fork child processes, there are 3 kinds of child processes: + +1. Monitoring process, initializes database and calculating writing speed +2. Reading process (n), reads data from data source +3. Writing process (m), wirtes data into TDengine + +`main` function provides 5 parameters: + +1. The number of reading tasks, default value is 1 +2. The number of writing tasks, default value is 1 +3. The number of tables, default value is 1,000 +4. The capacity of message queue, default value is 1,000,000 bytes +5. The batch size in single write, default value is 3000 + +```python +{{#include docs/examples/python/fast_write_example.py:main}} +``` + +
+ +
+run_monitor_process + +Monitoring process initilizes database and monitoring writing speed. + +```python +{{#include docs/examples/python/fast_write_example.py:monitor}} +``` + +
+ +
+ +run_read_task function + +Reading process reads data from other data system and distributes to the message queue allocated for it. + +```python +{{#include docs/examples/python/fast_write_example.py:read}} +``` + +
+ +
+ +MockDataSource + +Below is the simulated data source, we assume table name exists in each generated data. + +```python +{{#include docs/examples/python/mockdatasource.py}} +``` + +
+ +
+run_write_task function + +Writing process tries to read as much as possible data from message queue and writes in batch. + +```python +{{#include docs/examples/python/fast_write_example.py:write}} +``` + +
+ +
+ +SQLWriter class encapsulates the logic of composing SQL and writing data. Please be noted that the tables have not been created before writing, but are created automatically when catching the exception of table doesn't exist. For other exceptions caught, the SQL which caused the exception are logged for you to debug. This class also checks the SQL length, if the SQL length is closed to `maxSQLLength` the SQL will be executed immediately. To improve writing efficiency, it's better to increase `maxSQLLength` properly. + +SQLWriter + +```python +{{#include docs/examples/python/sql_writer.py}} +``` + +
+ +**Steps to Launch** + +
+ +Launch Sample Program in Python + +1. Prerequisities + + - TDengine client driver has been installed + - Python3 has been installed, the the version >= 3.8 + - TDengine Python connector `taospy` has been installed + +2. Install faster-fifo to replace python builtin multiprocessing.Queue + + ``` + pip3 install faster-fifo + ``` + +3. Click the "Copy" in the above sample programs to copy `fast_write_example.py` 、 `sql_writer.py` and `mockdatasource.py`. + +4. Execute the program + + ``` + python3 fast_write_example.py + ``` + + Below is the output of running on a server of 16 cores, 64GB memory and SSD hard disk. + + ``` + root@vm85$ python3 fast_write_example.py 8 8 + 2022-07-14 19:13:45,869 [root] - READ_TASK_COUNT=8, WRITE_TASK_COUNT=8, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000 + 2022-07-14 19:13:48,882 [root] - WriteTask-0 started with pid 718347 + 2022-07-14 19:13:48,883 [root] - WriteTask-1 started with pid 718348 + 2022-07-14 19:13:48,884 [root] - WriteTask-2 started with pid 718349 + 2022-07-14 19:13:48,884 [root] - WriteTask-3 started with pid 718350 + 2022-07-14 19:13:48,885 [root] - WriteTask-4 started with pid 718351 + 2022-07-14 19:13:48,885 [root] - WriteTask-5 started with pid 718352 + 2022-07-14 19:13:48,886 [root] - WriteTask-6 started with pid 718353 + 2022-07-14 19:13:48,886 [root] - WriteTask-7 started with pid 718354 + 2022-07-14 19:13:48,887 [root] - ReadTask-0 started with pid 718355 + 2022-07-14 19:13:48,888 [root] - ReadTask-1 started with pid 718356 + 2022-07-14 19:13:48,889 [root] - ReadTask-2 started with pid 718357 + 2022-07-14 19:13:48,889 [root] - ReadTask-3 started with pid 718358 + 2022-07-14 19:13:48,890 [root] - ReadTask-4 started with pid 718359 + 2022-07-14 19:13:48,891 [root] - ReadTask-5 started with pid 718361 + 2022-07-14 19:13:48,892 [root] - ReadTask-6 started with pid 718364 + 2022-07-14 19:13:48,893 [root] - ReadTask-7 started with pid 718365 + 2022-07-14 19:13:56,042 [DataBaseMonitor] - count=6676310 speed=667631.0 + 2022-07-14 19:14:06,196 [DataBaseMonitor] - count=20004310 speed=1332800.0 + 2022-07-14 19:14:16,366 [DataBaseMonitor] - count=32290310 speed=1228600.0 + 2022-07-14 19:14:26,527 [DataBaseMonitor] - count=44438310 speed=1214800.0 + 2022-07-14 19:14:36,673 [DataBaseMonitor] - count=56608310 speed=1217000.0 + 2022-07-14 19:14:46,834 [DataBaseMonitor] - count=68757310 speed=1214900.0 + 2022-07-14 19:14:57,280 [DataBaseMonitor] - count=80992310 speed=1223500.0 + 2022-07-14 19:15:07,689 [DataBaseMonitor] - count=93805310 speed=1281300.0 + 2022-07-14 19:15:18,020 [DataBaseMonitor] - count=106111310 speed=1230600.0 + 2022-07-14 19:15:28,356 [DataBaseMonitor] - count=118394310 speed=1228300.0 + 2022-07-14 19:15:38,690 [DataBaseMonitor] - count=130742310 speed=1234800.0 + 2022-07-14 19:15:49,000 [DataBaseMonitor] - count=143051310 speed=1230900.0 + 2022-07-14 19:15:59,323 [DataBaseMonitor] - count=155276310 speed=1222500.0 + 2022-07-14 19:16:09,649 [DataBaseMonitor] - count=167603310 speed=1232700.0 + 2022-07-14 19:16:19,995 [DataBaseMonitor] - count=179976310 speed=1237300.0 + ``` + +
+ +:::note +Don't establish connection to TDengine in the parent process if using Python connector in multi-process way, otherwise all the connections in child processes are blocked always. This is a known issue. + +::: + +
+
diff --git a/docs/en/10-deployment/01-deploy.md b/docs/en/10-deployment/01-deploy.md index 2f89dd4f8b25175fc4b6c7b1246a3dbe2e98f3d7..477e618a45218bac466fd5753942b3950c291b99 100644 --- a/docs/en/10-deployment/01-deploy.md +++ b/docs/en/10-deployment/01-deploy.md @@ -114,7 +114,11 @@ The above process can be repeated to add more dnodes in the cluster. Any node that is in the cluster and online can be the firstEp of new nodes. Nodes use the firstEp parameter only when joining a cluster for the first time. After a node has joined the cluster, it stores the latest mnode in its end point list and no longer makes use of firstEp. +<<<<<<< HEAD However, firstEp is used by clients that connect to the cluster. For example, if you run TDengine CLI `taos` without arguments, it connects to the firstEp by default. +======= +However, firstEp is used by clients that connect to the cluster. For example, if you run `TDengine CLI` without arguments, it connects to the firstEp by default. +>>>>>>> 30903ba80fe488b3b8e96db7f599052a05f7c025 Two dnodes that are launched without a firstEp value operate independently of each other. It is not possible to add one dnode to the other dnode and form a cluster. It is also not possible to form two independent clusters into a new cluster. ::: diff --git a/docs/examples/python/fast_write_example.py b/docs/examples/python/fast_write_example.py new file mode 100644 index 0000000000000000000000000000000000000000..c9d606388fdecd85f1468f24cc497ecc5941f035 --- /dev/null +++ b/docs/examples/python/fast_write_example.py @@ -0,0 +1,180 @@ +# install dependencies: +# recommend python >= 3.8 +# pip3 install faster-fifo +# + +import logging +import math +import sys +import time +import os +from multiprocessing import Process +from faster_fifo import Queue +from mockdatasource import MockDataSource +from queue import Empty +from typing import List + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") + +READ_TASK_COUNT = 1 +WRITE_TASK_COUNT = 1 +TABLE_COUNT = 1000 +QUEUE_SIZE = 1000000 +MAX_BATCH_SIZE = 3000 + +read_processes = [] +write_processes = [] + + +def get_connection(): + """ + If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used. + You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD + """ + import taos + firstEP = os.environ.get("TDENGINE_FIRST_EP") + if firstEP: + host, port = firstEP.split(":") + else: + host, port = None, 0 + user = os.environ.get("TDENGINE_USER", "root") + password = os.environ.get("TDENGINE_PASSWORD", "taosdata") + return taos.connect(host=host, port=int(port), user=user, password=password) + + +# ANCHOR: read + +def run_read_task(task_id: int, task_queues: List[Queue]): + table_count_per_task = TABLE_COUNT // READ_TASK_COUNT + data_source = MockDataSource(f"tb{task_id}", table_count_per_task) + try: + for batch in data_source: + for table_id, rows in batch: + # hash data to different queue + i = table_id % len(task_queues) + # block putting forever when the queue is full + task_queues[i].put_many(rows, block=True, timeout=-1) + except KeyboardInterrupt: + pass + + +# ANCHOR_END: read + +# ANCHOR: write +def run_write_task(task_id: int, queue: Queue): + from sql_writer import SQLWriter + log = logging.getLogger(f"WriteTask-{task_id}") + writer = SQLWriter(get_connection) + lines = None + try: + while True: + try: + # get as many as possible + lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE) + writer.process_lines(lines) + except Empty: + time.sleep(0.01) + except KeyboardInterrupt: + pass + except BaseException as e: + log.debug(f"lines={lines}") + raise e + + +# ANCHOR_END: write + +def set_global_config(): + argc = len(sys.argv) + if argc > 1: + global READ_TASK_COUNT + READ_TASK_COUNT = int(sys.argv[1]) + if argc > 2: + global WRITE_TASK_COUNT + WRITE_TASK_COUNT = int(sys.argv[2]) + if argc > 3: + global TABLE_COUNT + TABLE_COUNT = int(sys.argv[3]) + if argc > 4: + global QUEUE_SIZE + QUEUE_SIZE = int(sys.argv[4]) + if argc > 5: + global MAX_BATCH_SIZE + MAX_BATCH_SIZE = int(sys.argv[5]) + + +# ANCHOR: monitor +def run_monitor_process(): + log = logging.getLogger("DataBaseMonitor") + conn = get_connection() + conn.execute("DROP DATABASE IF EXISTS test") + conn.execute("CREATE DATABASE test") + conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " + "TAGS (location BINARY(64), groupId INT)") + + def get_count(): + res = conn.query("SELECT count(*) FROM test.meters") + rows = res.fetch_all() + return rows[0][0] if rows else 0 + + last_count = 0 + while True: + time.sleep(10) + count = get_count() + log.info(f"count={count} speed={(count - last_count) / 10}") + last_count = count + + +# ANCHOR_END: monitor +# ANCHOR: main +def main(): + set_global_config() + logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, " + f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") + + monitor_process = Process(target=run_monitor_process) + monitor_process.start() + time.sleep(3) # waiting for database ready. + + task_queues: List[Queue] = [] + # create task queues + for i in range(WRITE_TASK_COUNT): + queue = Queue(max_size_bytes=QUEUE_SIZE) + task_queues.append(queue) + + # create write processes + for i in range(WRITE_TASK_COUNT): + p = Process(target=run_write_task, args=(i, task_queues[i])) + p.start() + logging.debug(f"WriteTask-{i} started with pid {p.pid}") + write_processes.append(p) + + # create read processes + for i in range(READ_TASK_COUNT): + queues = assign_queues(i, task_queues) + p = Process(target=run_read_task, args=(i, queues)) + p.start() + logging.debug(f"ReadTask-{i} started with pid {p.pid}") + read_processes.append(p) + + try: + monitor_process.join() + except KeyboardInterrupt: + monitor_process.terminate() + [p.terminate() for p in read_processes] + [p.terminate() for p in write_processes] + [q.close() for q in task_queues] + + +def assign_queues(read_task_id, task_queues): + """ + Compute target queues for a specific read task. + """ + ratio = WRITE_TASK_COUNT / READ_TASK_COUNT + from_index = math.floor(read_task_id * ratio) + end_index = math.ceil((read_task_id + 1) * ratio) + return task_queues[from_index:end_index] + + +if __name__ == '__main__': + main() +# ANCHOR_END: main diff --git a/docs/examples/python/mockdatasource.py b/docs/examples/python/mockdatasource.py new file mode 100644 index 0000000000000000000000000000000000000000..852860aec0adc8f9b043c9dcd5deb0bf00239201 --- /dev/null +++ b/docs/examples/python/mockdatasource.py @@ -0,0 +1,49 @@ +import time + + +class MockDataSource: + samples = [ + "8.8,119,0.32,LosAngeles,0", + "10.7,116,0.34,SanDiego,1", + "9.9,111,0.33,Hollywood,2", + "8.9,113,0.329,Compton,3", + "9.4,118,0.141,San Francisco,4" + ] + + def __init__(self, tb_name_prefix, table_count): + self.table_name_prefix = tb_name_prefix + "_" + self.table_count = table_count + self.max_rows = 10000000 + self.current_ts = round(time.time() * 1000) - self.max_rows * 100 + # [(tableId, tableName, values),] + self.data = self._init_data() + + def _init_data(self): + lines = self.samples * (self.table_count // 5 + 1) + data = [] + for i in range(self.table_count): + table_name = self.table_name_prefix + str(i) + data.append((i, table_name, lines[i])) # tableId, row + return data + + def __iter__(self): + self.row = 0 + return self + + def __next__(self): + """ + next 1000 rows for each table. + return: {tableId:[row,...]} + """ + # generate 1000 timestamps + ts = [] + for _ in range(1000): + self.current_ts += 100 + ts.append(str(self.current_ts)) + # add timestamp to each row + # [(tableId, ["tableName,ts,current,voltage,phase,location,groupId"])] + result = [] + for table_id, table_name, values in self.data: + rows = [table_name + ',' + t + ',' + values for t in ts] + result.append((table_id, rows)) + return result diff --git a/docs/examples/python/sql_writer.py b/docs/examples/python/sql_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..758167376b009f21afc701be7d89c1bfbabdeb9f --- /dev/null +++ b/docs/examples/python/sql_writer.py @@ -0,0 +1,90 @@ +import logging +import taos + + +class SQLWriter: + log = logging.getLogger("SQLWriter") + + def __init__(self, get_connection_func): + self._tb_values = {} + self._tb_tags = {} + self._conn = get_connection_func() + self._max_sql_length = self.get_max_sql_length() + self._conn.execute("USE test") + + def get_max_sql_length(self): + rows = self._conn.query("SHOW variables").fetch_all() + for r in rows: + name = r[0] + if name == "maxSQLLength": + return int(r[1]) + return 1024 * 1024 + + def process_lines(self, lines: str): + """ + :param lines: [[tbName,ts,current,voltage,phase,location,groupId]] + """ + for line in lines: + ps = line.split(",") + table_name = ps[0] + value = '(' + ",".join(ps[1:-2]) + ') ' + if table_name in self._tb_values: + self._tb_values[table_name] += value + else: + self._tb_values[table_name] = value + + if table_name not in self._tb_tags: + location = ps[-2] + group_id = ps[-1] + tag_value = f"('{location}',{group_id})" + self._tb_tags[table_name] = tag_value + self.flush() + + def flush(self): + """ + Assemble INSERT statement and execute it. + When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created. + In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed. + """ + sql = "INSERT INTO " + sql_len = len(sql) + buf = [] + for tb_name, values in self._tb_values.items(): + q = tb_name + " VALUES " + values + if sql_len + len(q) >= self._max_sql_length: + sql += " ".join(buf) + self.execute_sql(sql) + sql = "INSERT INTO " + sql_len = len(sql) + buf = [] + buf.append(q) + sql_len += len(q) + sql += " ".join(buf) + self.execute_sql(sql) + self._tb_values.clear() + + def execute_sql(self, sql): + try: + self._conn.execute(sql) + except taos.Error as e: + error_code = e.errno & 0xffff + # Table does not exit + if error_code == 9731: + self.create_tables() + else: + self.log.error("Execute SQL: %s", sql) + raise e + except BaseException as baseException: + self.log.error("Execute SQL: %s", sql) + raise baseException + + def create_tables(self): + sql = "CREATE TABLE " + for tb in self._tb_values.keys(): + tag_values = self._tb_tags[tb] + sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " " + try: + self._conn.execute(sql) + except BaseException as e: + self.log.error("Execute SQL: %s", sql) + raise e diff --git a/docs/zh/07-develop/03-insert-data/01-sql-writing.mdx b/docs/zh/07-develop/03-insert-data/01-sql-writing.mdx index 214cbdaa96d02e0cd1251eeda97c6a897887cc7e..2920fa35a447861fd5c34a84c1950b22407b214d 100644 --- a/docs/zh/07-develop/03-insert-data/01-sql-writing.mdx +++ b/docs/zh/07-develop/03-insert-data/01-sql-writing.mdx @@ -23,7 +23,7 @@ import PhpStmt from "./_php_stmt.mdx"; ## SQL 写入简介 -应用通过连接器执行 INSERT 语句来插入数据,用户还可以通过 TAOS Shell,手动输入 INSERT 语句插入数据。 +应用通过连接器执行 INSERT 语句来插入数据,用户还可以通过 TDengine CLI,手动输入 INSERT 语句插入数据。 ### 一次写入一条 下面这条 INSERT 就将一条记录写入到表 d1001 中: diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.md new file mode 100644 index 0000000000000000000000000000000000000000..b8647b6ad71b2c40d307061b369dd9565dfdf471 --- /dev/null +++ b/docs/zh/07-develop/03-insert-data/05-high-volume.md @@ -0,0 +1,440 @@ +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; + +# 高效写入 + +本节介绍如何高效地向 TDengine 写入数据。 + +## 高效写入原理 {#principle} + +### 客户端程序的角度 {#application-view} + +从客户端程序的角度来说,高效写入数据要考虑以下几个因素: + +1. 单次写入的数据量。一般来讲,每批次写入的数据量越大越高效(但超过一定阈值其优势会消失)。使用 SQL 写入 TDengine 时,尽量在一条 SQL 中拼接更多数据。目前,TDengine 支持的一条 SQL 的最大长度为 1,048,576(1M)个字符。可通过配置客户端参数 maxSQLLength(默认值为 65480)进行修改。 +2. 并发连接数。一般来讲,同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。 +3. 数据在不同表(或子表)之间的分布,即要写入数据的相邻性。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效; +4. 写入方式。一般来讲: + - 参数绑定写入比 SQL 写入更高效。因参数绑定方式避免了 SQL 解析。(但增加了 C 接口的调用次数,对于连接器也有性能损耗)。 + - SQL 写入不自动建表比自动建表更高效。因自动建表要频繁检查表是否存在 + - SQL 写入比无模式写入更高效。因无模式写入会自动建表且支持动态更改表结构 + +客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。 + +### 数据源的角度 {#datasource-view} + +客户端程序通常需要从数据源读数据再写入 TDengine。从数据源角度来说,以下几种情况需要在读线程和写线程之间增加队列: + +1. 有多个数据源,单个数据源生成数据的速度远小于单线程写入的速度,但数据量整体比较大。此时队列的作用是把多个数据源的数据汇聚到一起,增加单次写入的数据量。 +2. 单个数据源生成数据的速度远大于单线程写入的速度。此时队列的作用是增加写入的并发度。 +3. 单张表的数据分散在多个数据源。此时队列的作用是将同一张表的数据提前汇聚到一起,提高写入时数据的相邻性。 + +如果写应用的数据源是 Kafka, 写应用本身即 Kafka 的消费者,则可利用 Kafka 的特性实现高效写入。比如: + +1. 将同一张表的数据写到同一个 Topic 的同一个 Partition,增加数据的相邻性 +2. 通过订阅多个 Topic 实现数据汇聚 +3. 通过增加 Consumer 线程数增加写入的并发度 +4. 通过增加每次 fetch 的最大数据量来增加单次写入的最大数据量 + +### 服务器配置的角度 {#setting-view} + +从服务器配置的角度来说,也有很多优化写入性能的方法。 + +如果总表数不多(远小于核数乘以1000), 且无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那么很可能是因为表在各个 vgroup 分布不均。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么所有的表都会分布在 1 个 vgroup 上。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则可将表分布至 10 个 vgroup。(假设 maxVgroupsPerDb 大于等于 10)。 + +如果总表数比较大(比如大于500万),适当增加 maxVgroupsPerDb 也能显著提高建表的速度。maxVgroupsPerDb 默认值为 0, 自动配置为 CPU 的核数。 如果表的数量巨大,也建议调节 maxTablesPerVnode 参数,以免超过单个 vnode 建表的上限。 + +更多调优参数,请参考[性能优化](../../../operation/optimize)和[配置参考](../../../reference/config)部分。 + +## 高效写入示例 {#sample-code} + +### 场景设计 {#scenario} + +下面的示例程序展示了如何高效写入数据,场景设计如下: + +- TDengine 客户端程序从其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源 +- 单个连接向 TDengine 写入的速度无法与读数据的速度相匹配,因此客户端程序启动多个线程,每个线程都建立了与 TDengine 的连接,每个线程都有一个独占的固定大小的消息队列 +- 客户端程序将接收到的数据根据所属的表名(或子表名)HASH 到不同的线程,即写入该线程所对应的消息队列,以此确保属于某个表(或子表)的数据一定会被一个固定的线程处理 +- 各个子线程在将所关联的消息队列中的数据读空后或者读取数据量达到一个预定的阈值后将该批数据写入 TDengine,并继续处理后面接收到的数据 + +![TDengine 高效写入示例场景的线程模型](highvolume.webp) + +### 示例代码 {#code} + +这一部分是针对以上场景的示例代码。对于其它场景高效写入原理相同,不过代码需要适当修改。 + +本示例代码假设源数据属于同一张超级表(meters)的不同子表。程序在开始写入数据之前已经在 test 库创建了这个超级表。对于子表,将根据收到的数据,由应用程序自动创建。如果实际场景是多个超级表,只需修改写任务自动建表的代码。 + + + + +**程序清单** + +| 类名 | 功能说明 | +| ---------------- | --------------------------------------------------------------------------- | +| FastWriteExample | 主程序 | +| ReadTask | 从模拟源中读取数据,将表名经过 hash 后得到 Queue 的 index,写入对应的 Queue | +| WriteTask | 从 Queue 中获取数据,组成一个 Batch,写入 TDengine | +| MockDataSource | 模拟生成一定数量 meters 子表的数据 | +| SQLWriter | WriteTask 依赖这个类完成 SQL 拼接、自动建表、 SQL 写入、SQL 长度检查 | +| StmtWriter | 实现参数绑定方式批量写入(暂未完成) | +| DataBaseMonitor | 统计写入速度,并每隔 10 秒把当前写入速度打印到控制台 | + + +以下是各类的完整代码和更详细的功能说明。 + +
+FastWriteExample +主程序负责: + +1. 创建消息队列 +2. 启动写线程 +3. 启动读线程 +4. 每隔 10 秒统计一次写入速度 + +主程序默认暴露了 4 个参数,每次启动程序都可调节,用于测试和调优: + +1. 读线程个数。默认为 1。 +2. 写线程个数。默认为 3。 +3. 模拟生成的总表数。默认为 1000。将会平分给各个读线程。如果总表数较大,建表需要花费较长,开始统计的写入速度可能较慢。 +4. 每批最多写入记录数量。默认为 3000。 + +队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。 示例程序默认值已经设置地足够大。 + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java}} +``` + +
+ +
+ReadTask + +读任务负责从数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成一点数量表的数据。不同的模拟数据源生成不同表的数据。 + +读任务采用阻塞的方式写消息队列。也就是说,一旦队列满了,写操作就会阻塞。 + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java}} +``` + +
+ +
+WriteTask + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java}} +``` + +
+ +
+ +MockDataSource + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java}} +``` + +
+ +
+ +SQLWriter + +SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是在 catch 到表不存在异常的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。对于其它异常,这里简单地记录当时执行的 SQL 语句到日志中,你也可以记录更多线索到日志,已便排查错误和故障恢复。 + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java}} +``` + +
+ +
+ +DataBaseMonitor + +```java +{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java}} +``` + +
+ +**执行步骤** + +
+执行 Java 示例程序 + +执行程序前需配置环境变量 `TDENGINE_JDBC_URL`。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置: + +``` +TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" +``` + +**本地集成开发环境执行示例程序** + +1. clone TDengine 仓库 + ``` + git clone git@github.com:taosdata/TDengine.git --depth 1 + ``` +2. 用集成开发环境打开 `docs/examples/java` 目录。 +3. 在开发环境中配置环境变量 `TDENGINE_JDBC_URL`。如果已配置了全局的环境变量 `TDENGINE_JDBC_URL` 可跳过这一步。 +4. 运行类 `com.taos.example.highvolume.FastWriteExample`。 + +**远程服务器上执行示例程序** + +若要在服务器上执行示例程序,可按照下面的步骤操作: + +1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行: + ``` + mvn package + ``` +2. 远程服务器上创建 examples 目录: + ``` + mkdir -p examples/java + ``` +3. 复制依赖到服务器指定目录: + - 复制依赖包,只用复制一次 + ``` + scp -r .\target\lib @:~/examples/java + ``` + - 复制本程序的 jar 包,每次更新代码都需要复制 + ``` + scp -r .\target\javaexample-1.0.jar @:~/examples/java + ``` +4. 配置环境变量。 + 编辑 `~/.bash_profile` 或 `~/.bashrc` 添加如下内容例如: + + ``` + export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" + ``` + + 以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。 + +5. 用 java 命令启动示例程序,命令模板: + + ``` + java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample + ``` + +6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 CTRL + C 结束程序。 + 下面是一次实际运行的日志输出,机器配置 16核 + 64G + 固态硬盘。 + + ``` + root@vm85$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 2 12 + 18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000 + 18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started + 18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started + 18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started + 18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576 + 18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444 + 18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521 + 18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394 + 18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933 + 18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696 + 18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729 + 18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521 + 18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788 + 18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950 + ``` + +
+ +
+ + +**程序清单** + +Python 示例程序中采用了多进程的架构,并使用了跨进程的消息队列。 + +| 函数或类 | 功能说明 | +| ------------------------ | -------------------------------------------------------------------- | +| main 函数 | 程序入口, 创建各个子进程和消息队列 | +| run_monitor_process 函数 | 创建数据库,超级表,统计写入速度并定时打印到控制台 | +| run_read_task 函数 | 读进程主要逻辑,负责从其它数据系统读数据,并分发数据到为之分配的队列 | +| MockDataSource 类 | 模拟数据源, 实现迭代器接口,每次批量返回每张表的接下来 1000 条数据 | +| run_write_task 函数 | 写进程主要逻辑。每次从队列中取出尽量多的数据,并批量写入 | +| SQLWriter类 | SQL 写入和自动建表 | +| StmtWriter 类 | 实现参数绑定方式批量写入(暂未完成) | + + +
+main 函数 + +main 函数负责创建消息队列和启动子进程,子进程有 3 类: + +1. 1 个监控进程,负责数据库初始化和统计写入速度 +2. n 个读进程,负责从其它数据系统读数据 +3. m 个写进程,负责写数据库 + +main 函数可以接收 5 个启动参数,依次是: + +1. 读任务(进程)数, 默认为 1 +2. 写任务(进程)数, 默认为 1 +3. 模拟生成的总表数,默认为 1000 +4. 队列大小(单位字节),默认为 1000000 +5. 每批最多写入记录数量, 默认为 3000 + +```python +{{#include docs/examples/python/fast_write_example.py:main}} +``` + +
+ +
+run_monitor_process + +监控进程负责初始化数据库,并监控当前的写入速度。 + +```python +{{#include docs/examples/python/fast_write_example.py:monitor}} +``` + +
+ +
+ +run_read_task 函数 + +读进程,负责从其它数据系统读数据,并分发数据到为之分配的队列。 + +```python +{{#include docs/examples/python/fast_write_example.py:read}} +``` + +
+ +
+ +MockDataSource + +以下是模拟数据源的实现,我们假设数据源生成的每一条数据都带有目标表名信息。实际中你可能需要一定的规则确定目标表名。 + +```python +{{#include docs/examples/python/mockdatasource.py}} +``` + +
+ +
+run_write_task 函数 + +写进程每次从队列中取出尽量多的数据,并批量写入。 + +```python +{{#include docs/examples/python/fast_write_example.py:write}} +``` + +
+ +
+ +SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提前创建,而是在发生表不存在错误的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。对于其它错误会记录当时执行的 SQL, 以便排查错误和故障恢复。这个类也对 SQL 是否超过最大长度限制做了检查,如果接近 SQL 最大长度限制(maxSQLLength),将会立即执行 SQL。为了减少 SQL 此时,建议将 maxSQLLength 适当调大。 + +SQLWriter + +```python +{{#include docs/examples/python/sql_writer.py}} +``` + +
+ +**执行步骤** + +
+ +执行 Python 示例程序 + +1. 前提条件 + + - 已安装 TDengine 客户端驱动 + - 已安装 Python3, 推荐版本 >= 3.8 + - 已安装 taospy + +2. 安装 faster-fifo 代替 python 内置的 multiprocessing.Queue + + ``` + pip3 install faster-fifo + ``` + +3. 点击上面的“查看源码”链接复制 `fast_write_example.py` 、 `sql_writer.py` 和 `mockdatasource.py` 三个文件。 + +4. 执行示例程序 + + ``` + python3 fast_write_example.py + ``` + + 下面是一次实际运行的输出, 机器配置 16核 + 64G + 固态硬盘。 + + ``` + root@vm85$ python3 fast_write_example.py 8 8 + 2022-07-14 19:13:45,869 [root] - READ_TASK_COUNT=8, WRITE_TASK_COUNT=8, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000 + 2022-07-14 19:13:48,882 [root] - WriteTask-0 started with pid 718347 + 2022-07-14 19:13:48,883 [root] - WriteTask-1 started with pid 718348 + 2022-07-14 19:13:48,884 [root] - WriteTask-2 started with pid 718349 + 2022-07-14 19:13:48,884 [root] - WriteTask-3 started with pid 718350 + 2022-07-14 19:13:48,885 [root] - WriteTask-4 started with pid 718351 + 2022-07-14 19:13:48,885 [root] - WriteTask-5 started with pid 718352 + 2022-07-14 19:13:48,886 [root] - WriteTask-6 started with pid 718353 + 2022-07-14 19:13:48,886 [root] - WriteTask-7 started with pid 718354 + 2022-07-14 19:13:48,887 [root] - ReadTask-0 started with pid 718355 + 2022-07-14 19:13:48,888 [root] - ReadTask-1 started with pid 718356 + 2022-07-14 19:13:48,889 [root] - ReadTask-2 started with pid 718357 + 2022-07-14 19:13:48,889 [root] - ReadTask-3 started with pid 718358 + 2022-07-14 19:13:48,890 [root] - ReadTask-4 started with pid 718359 + 2022-07-14 19:13:48,891 [root] - ReadTask-5 started with pid 718361 + 2022-07-14 19:13:48,892 [root] - ReadTask-6 started with pid 718364 + 2022-07-14 19:13:48,893 [root] - ReadTask-7 started with pid 718365 + 2022-07-14 19:13:56,042 [DataBaseMonitor] - count=6676310 speed=667631.0 + 2022-07-14 19:14:06,196 [DataBaseMonitor] - count=20004310 speed=1332800.0 + 2022-07-14 19:14:16,366 [DataBaseMonitor] - count=32290310 speed=1228600.0 + 2022-07-14 19:14:26,527 [DataBaseMonitor] - count=44438310 speed=1214800.0 + 2022-07-14 19:14:36,673 [DataBaseMonitor] - count=56608310 speed=1217000.0 + 2022-07-14 19:14:46,834 [DataBaseMonitor] - count=68757310 speed=1214900.0 + 2022-07-14 19:14:57,280 [DataBaseMonitor] - count=80992310 speed=1223500.0 + 2022-07-14 19:15:07,689 [DataBaseMonitor] - count=93805310 speed=1281300.0 + 2022-07-14 19:15:18,020 [DataBaseMonitor] - count=106111310 speed=1230600.0 + 2022-07-14 19:15:28,356 [DataBaseMonitor] - count=118394310 speed=1228300.0 + 2022-07-14 19:15:38,690 [DataBaseMonitor] - count=130742310 speed=1234800.0 + 2022-07-14 19:15:49,000 [DataBaseMonitor] - count=143051310 speed=1230900.0 + 2022-07-14 19:15:59,323 [DataBaseMonitor] - count=155276310 speed=1222500.0 + 2022-07-14 19:16:09,649 [DataBaseMonitor] - count=167603310 speed=1232700.0 + 2022-07-14 19:16:19,995 [DataBaseMonitor] - count=179976310 speed=1237300.0 + ``` + +
+ +:::note +使用 Python 连接器多进程连接 TDengine 的时候,有一个限制:不能在父进程中建立连接,所有连接只能在子进程中创建。 +如果在父进程中创建连接,子进程再创建连接就会一直阻塞。这是个已知问题。 + +::: + +
+
+ + diff --git a/docs/zh/07-develop/04-query-data/index.mdx b/docs/zh/07-develop/04-query-data/index.mdx index c083c30c2c26f8ecff96a36f3f4151e103ea1052..92cb1906d9530238b156856b0b6b0dcc9719111f 100644 --- a/docs/zh/07-develop/04-query-data/index.mdx +++ b/docs/zh/07-develop/04-query-data/index.mdx @@ -52,7 +52,7 @@ Query OK, 2 row(s) in set (0.001100s) ### 示例一 -在 TAOS Shell,查找加利福尼亚州所有智能电表采集的电压平均值,并按照 location 分组。 +在 TDengine CLI,查找加利福尼亚州所有智能电表采集的电压平均值,并按照 location 分组。 ``` taos> SELECT AVG(voltage), location FROM meters GROUP BY location; @@ -65,7 +65,7 @@ Query OK, 2 rows in database (0.005995s) ### 示例二 -在 TAOS shell, 查找 groupId 为 2 的所有智能电表的记录条数,电流的最大值。 +在 TDengine CLI, 查找 groupId 为 2 的所有智能电表的记录条数,电流的最大值。 ``` taos> SELECT count(*), max(current) FROM meters where groupId = 2; diff --git a/docs/zh/10-deployment/01-deploy.md b/docs/zh/10-deployment/01-deploy.md index 03b4ce30f980cd77e9845076ce9bb35c4474f948..c5f63cc636f024d9bd6739ea7c88f039382b90f3 100644 --- a/docs/zh/10-deployment/01-deploy.md +++ b/docs/zh/10-deployment/01-deploy.md @@ -71,7 +71,11 @@ serverPort 6030 ## 启动集群 +<<<<<<< HEAD 按照《立即开始》里的步骤,启动第一个数据节点,例如 h1.taosdata.com,然后执行 taos,启动 TDengine CLI,在其中执行命令 “SHOW DNODES”,如下所示: +======= +按照《立即开始》里的步骤,启动第一个数据节点,例如 h1.taosdata.com,然后执行 taos,启动 TDengine CLI,从 shell 里执行命令“SHOW DNODES”,如下所示: +>>>>>>> 30903ba80fe488b3b8e96db7f599052a05f7c025 ``` taos> show dnodes; diff --git a/docs/zh/14-reference/11-docker/index.md b/docs/zh/14-reference/11-docker/index.md index cf79a9b4be9119999d22bb52387b5217bcb1dbcf..1655817635d83de0e08ad563b90edd17ed15c968 100644 --- a/docs/zh/14-reference/11-docker/index.md +++ b/docs/zh/14-reference/11-docker/index.md @@ -32,7 +32,13 @@ taos> show databases; Query OK, 2 rows in database (0.033802s) ``` +<<<<<<< HEAD 因为运行在容器中的 TDengine 服务端使用容器的 hostname 建立连接,使用 TDengine CLI 或者各种连接器(例如 JDBC-JNI)从容器外访问容器内的 TDengine 比较复杂,所以上述方式是访问容器中 TDengine 服务的最简单的方法,适用于一些简单场景。如果在一些复杂场景下想要从容器化使用 TDengine CLI 或者各种连接器访问容器中的 TDengine 服务,请参考下一节。 +======= + +因为运行在容器中的 TDengine 服务端使用容器的 hostname 建立连接,使用 TDengine CLI 或者各种连接器(例如 JDBC-JNI)从容器外访问容器内的 TDengine 比较复杂,所以上述方式是访问容器中 TDengine 服务的最简单的方法,适用于一些简单场景。如果在一些复杂场景下想要从容器化使用 TDengine CLI 或者各种连接器访问容器中的 TDengine 服务,请参考下一节。 + +>>>>>>> 30903ba80fe488b3b8e96db7f599052a05f7c025 ## 在 host 网络上启动 TDengine @@ -354,7 +360,11 @@ test-docker_td-2_1 /tini -- /usr/bin/entrypoi ... Up test-docker_td-3_1 /tini -- /usr/bin/entrypoi ... Up ``` +<<<<<<< HEAD 4. 用 TDengine 查看 dnodes +======= +4. 用 TDengine CLI 查看 dnodes +>>>>>>> 30903ba80fe488b3b8e96db7f599052a05f7c025 ```shell diff --git a/examples/JDBC/JDBCDemo/README-jdbc-windows.md b/examples/JDBC/JDBCDemo/README-jdbc-windows.md index 17c5c8df00ab8727d1adfe493d3fbbd32891a676..5a781f40f730218286edb9f6a7f184ee79e7a5fc 100644 --- a/examples/JDBC/JDBCDemo/README-jdbc-windows.md +++ b/examples/JDBC/JDBCDemo/README-jdbc-windows.md @@ -129,7 +129,7 @@ https://www.taosdata.com/cn/all-downloads/ 192.168.236.136 td01 ``` -配置完成后,在命令行内使用taos shell连接server端 +配置完成后,在命令行内使用TDengine CLI连接server端 ```shell C:\TDengine>taos -h td01 diff --git a/examples/nodejs/README-win.md b/examples/nodejs/README-win.md index 75fec69413af2bb49498118ec7235c9947e2f89e..e496be2f87e3ff0fcc01359f23888734669b0c22 100644 --- a/examples/nodejs/README-win.md +++ b/examples/nodejs/README-win.md @@ -35,7 +35,7 @@ Python 2.7.18 下载地址:https://www.taosdata.com/cn/all-downloads/,选择一个合适的windows-client下载(client应该尽量与server端的版本保持一致) -使用client的taos shell连接server +使用client的TDengine CLI连接server ```shell >taos -h node5 diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index aae2e7c856ac7ce4747d798acf5852d6cdf21535..87f465fdb93ddbff8973430b11ecadc13878069d 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -38,7 +38,7 @@ # The interval of dnode reporting status to mnode # statusInterval 1 -# The interval for taos shell to send heartbeat to mnode +# The interval for TDengine CLI to send heartbeat to mnode # shellActivityTimer 3 # The minimum sliding window time, milli-second diff --git a/packaging/docker/README.md b/packaging/docker/README.md index e41182f471050af6b4d47b696eb237e319b2dd80..cb27d3bca69ff3b9f6919cb7a47ac076008b29c1 100644 --- a/packaging/docker/README.md +++ b/packaging/docker/README.md @@ -47,7 +47,7 @@ taos> show databases; Query OK, 1 row(s) in set (0.002843s) ``` -Since TDengine use container hostname to establish connections, it's a bit more complex to use taos shell and native connectors(such as JDBC-JNI) with TDengine container instance. This is the recommended way to expose ports and use TDengine with docker in simple cases. If you want to use taos shell or taosc/connectors smoothly outside the `tdengine` container, see next use cases that match you need. +Since TDengine use container hostname to establish connections, it's a bit more complex to use TDengine CLI and native connectors(such as JDBC-JNI) with TDengine container instance. This is the recommended way to expose ports and use TDengine with docker in simple cases. If you want to use TDengine CLI or taosc/connectors smoothly outside the `tdengine` container, see next use cases that match you need. ### Start with host network @@ -87,7 +87,7 @@ docker run -d \ This command starts a docker container with TDengine server running and maps the container's TCP ports from 6030 to 6049 to the host's ports from 6030 to 6049 with TCP protocol and UDP ports range 6030-6039 to the host's UDP ports 6030-6039. If the host is already running TDengine server and occupying the same port(s), you need to map the container's port to a different unused port segment. (Please see TDengine 2.0 Port Description for details). In order to support TDengine clients accessing TDengine server services, both TCP and UDP ports need to be exposed by default(unless `rpcForceTcp` is set to `1`). -If you want to use taos shell or native connectors([JDBC-JNI](https://www.taosdata.com/cn/documentation/connector/java), or [driver-go](https://github.com/taosdata/driver-go)), you need to make sure the `TAOS_FQDN` is resolvable at `/etc/hosts` or with custom DNS service. +If you want to use TDengine CLI or native connectors([JDBC-JNI](https://www.taosdata.com/cn/documentation/connector/java), or [driver-go](https://github.com/taosdata/driver-go)), you need to make sure the `TAOS_FQDN` is resolvable at `/etc/hosts` or with custom DNS service. If you set the `TAOS_FQDN` to host's hostname, it will works as using `hosts` network like previous use case. Otherwise, like in `-e TAOS_FQDN=tdengine`, you can add the hostname record `tdengine` into `/etc/hosts` (use `127.0.0.1` here in host path, if use TDengine client/application in other hosts, you should set the right ip to the host eg. `192.168.10.1`(check the real ip in host with `hostname -i` or `ip route list default`) to make the TDengine endpoint resolvable): @@ -391,7 +391,7 @@ test_td-1_1 /usr/bin/entrypoint.sh taosd Up 6030/tcp, 6031/tcp, test_td-2_1 /usr/bin/entrypoint.sh taosd Up 6030/tcp, 6031/tcp, 6032/tcp, 6033/tcp, 6034/tcp, 6035/tcp, 6036/tcp, 6037/tcp, 6038/tcp, 6039/tcp, 6040/tcp, 6041/tcp, 6042/tcp ``` -Check dnodes with taos shell: +Check dnodes with TDengine CLI: ```bash $ docker-compose exec td-1 taos -s "show dnodes" diff --git a/tests/script/tsim/query/crash_sql.sim b/tests/script/tsim/query/crash_sql.sim index 169f2e7272bfb5e2beb413e8a210e2ddf54d744d..79a9165e6602b1e8b1931e0f3ad9bf7d0168450f 100644 --- a/tests/script/tsim/query/crash_sql.sim +++ b/tests/script/tsim/query/crash_sql.sim @@ -76,7 +76,7 @@ sql insert into ct4 values ( '2022-05-21 01:01:01.000', NULL, NULL, NULL, NULL, print ================ start query ====================== -print ================ SQL used to cause taosd or taos shell crash +print ================ SQL used to cause taosd or TDengine CLI crash sql_error select sum(c1) ,count(c1) from ct4 group by c1 having sum(c10) between 0 and 1 ; #system sh/exec.sh -n dnode1 -s stop -x SIGINT