提交 6f9a5ae4 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

##########################################################################################
# Customize file classifications. #
# Results from files under any classifier will be excluded from LGTM #
# statistics. #
##########################################################################################
##########################################################################################
# Use the `path_classifiers` block to define changes to the default classification of #
# files. #
##########################################################################################
path_classifiers:
# docs:
# Identify the top-level file called `generate_javadoc.py` as documentation-related.
test:
# Override LGTM's default classification of test files by excluding all files.
- exclude: /
# Classify all files in the top-level directories tests/ and testsuites/ as test code.
- tests
# - testsuites
# Classify all files with suffix `.test` as test code.
# Note: use only forward slash / as a path separator.
# Use ** to indicate an arbitrary parent path.
# Use * to indicate any sequence of characters excluding /.
# Always enclose the expression in double quotes if it includes *.
# - "**/*.test"
# Refine the classifications above by excluding files in test/util/.
# - exclude: test/util
# The default behavior is to tag all files created during the
# build as `generated`. Results are hidden for generated code. You can tag
# further files as being generated by adding them to the `generated` section.
generated:
# Exclude all `*.c` files under the `ui/` directory from classification as
# generated code.
# - exclude: ui/**/*.c
# By default, all files not checked into the repository are considered to be
# 'generated'.
# The default behavior is to tag library code as `library`. Results are hidden
# for library code. You can tag further files as being library code by adding them
# to the `library` section.
library:
- exclude: deps/
# The default behavior is to tag template files as `template`. Results are hidden
# for template files. You can tag further files as being template files by adding
# them to the `template` section.
template:
#- exclude: path/to/template/code/**/*.c
# Define your own category, for example: 'some_custom_category'.
some_custom_category:
# Classify all files in the top-level directory tools/ (or the top-level file
# called tools).
# - tools
#########################################################################################
# Use the `queries` block to change the default display of query results. #
#########################################################################################
# queries:
# Start by hiding the results of all queries.
# - exclude: "*"
# Then include all queries tagged 'security' and 'correctness', and with a severity of
# 'error'.
# - include:
# tags:
# - "security"
# - "correctness"
# severity: "error"
# Specifically hide the results of two queries.
# - exclude: cpp/use-of-goto
# - exclude: java/equals-on-unrelated-types
# Refine by including the `java/command-line-injection` query.
# - include: java/command-line-injection
#########################################################################################
# Define changes to the default code extraction process. #
# Each block configures the extraction of a single language, and modifies actions in a #
# named step. Every named step includes automatic default actions, #
# except for the 'prepare' step. The steps are performed in the following sequence: #
# prepare #
# after_prepare #
# configure (C/C++ only) #
# python_setup (Python only) #
# before_index #
# index #
##########################################################################################
#########################################################################################
# Environment variables available to the steps: #
#########################################################################################
# LGTM_SRC
# The root of the source tree.
# LGTM_WORKSPACE
# An existing (initially empty) folder outside the source tree.
# Used for temporary download and setup commands.
#########################################################################################
# Use the extraction block to define changes to the default code extraction process #
# for one or more languages. The settings for each language are defined in a child #
# block, with one or more steps. #
#########################################################################################
extraction:
# Define settings for C/C++ analysis
#####################################
cpp:
# The `prepare` step exists for customization on LGTM.com only.
prepare:
# # The `packages` section is valid for LGTM.com only. It names Ubuntu packages to
# # be installed.
packages:
- cmake
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# This step is useful for C/C++ analysis where you want to prepare the environment
# for the `configure` step without changing the default behavior for that step.
# after_prepare:
#- export GNU_MAKE=make
#- export GIT=true
# The `configure` step generates build configuration files which the `index` step
# then uses to build the codebase.
configure:
command:
- mkdir build
- cd build
- cmake ..
# - ./prepare_deps
# Optional step. You should add a `before_index` step if you need to run commands
# before the `index` step.
# before_index:
# - export BOOST_DIR=$LGTM_SRC/boost
# - export GTEST_DIR=$LGTM_SRC/googletest
# - export HUNSPELL_DIR=$LGTM_SRC/hunspell
# - export CRYPTOPP_DIR=$LGTM_SRC/cryptopp
# The `index` step builds the code and extracts information during the build
# process.
index:
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
build_command:
- cd build
- make
# - $GNU_MAKE -j2 -s
# Specify that all project or solution files should be used for extraction.
# Default: false.
# all_solutions: true
# Specify a list of one or more project or solution files for extraction.
# Default: LGTM chooses the file closest to the root of the repository (this may
# fail if there are multiple candidates).
# solution:
# - myProject.sln
# Specify MSBuild settings
# msbuild:
# Specify a list of additional arguments to MSBuild. Default: empty.
# arguments: /p:Platform=x64 /p:Configuration=Release
# Specify the MSBuild configuration to use, for example, debug or release.
# Default: read from the solution file or files.
# configuration:
# Specify the platform to target, for example: x86, x64, or Any CPU.
# Default: read from the solution file or files.
# platform:
# Specify the MSBuild target. Default: rebuild.
# target:
# Specify whether or not to perform a NuGet restore for extraction. Default: true.
# nuget_restore: false
# Specify a version of Microsoft Visual Studio to use for MSBuild or any custom
# build commands (build_command). For example:
# 10 for Visual Studio 2010
# 12 for Visual Studio 2012
# 14 for Visual Studio 2015
# 15 for Visual Studio 2017
# Default: read from project files.
# vstools_version: 10
# Define settings for C# analysis
##################################
# csharp:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the `prepare` step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step builds the code and extracts information during the build
# process.
#index:
# Specify that all project or solution files should be used for extraction.
# Default: false.
# all_solutions: true
# Specify a list of one or more project or solution files for extraction.
# Default: LGTM chooses the file closest to the root of the repository (this may
# fail if there are multiple candidates).
# solution:
# - myProject.sln
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command:
# - ./example-compile-all.sh
# By default, LGTM analyzes the code by building it. You can override this,
# and tell LGTM not to build the code. Beware that this can lead
# to less accurate results.
# buildless: true
# Specify .NET Core settings.
# dotnet:
# Specify additional arguments to `dotnet build`.
# Default: empty.
# arguments: "example_arg"
# Specify the version of .NET Core SDK to use.
# Default: The version installed on the build machine.
# version: 2.1
# Specify MSBuild settings.
# msbuild:
# Specify a list of additional arguments to MSBuild. Default: empty.
# arguments: /P:WarningLevel=2
# Specify the MSBuild configuration to use, for example, debug or release.
# Default: read from the solution file or files.
# configuration: release
# Specify the platform to target, for example: x86, x64, or Any CPU.
# Default: read from the solution file or files.
# platform: x86
# Specify the MSBuild target. Default: rebuild.
# target: notest
# Specify whether or not to perform a NuGet restore for extraction. Default: true.
# nuget_restore: false
# Specify a version of Microsoft Visual Studio to use for MSBuild or any custom
# build commands (build_command). For example:
# 10 for Visual Studio 2010
# 12 for Visual Studio 2012
# 14 for Visual Studio 2015
# 15 for Visual Studio 2017
# Default: read from project files
# vstools_version: 10
# Specify additional options for the extractor,
# for example --fast to perform a faster extraction that produces a smaller
# database.
# extractor: "--fast"
# Define settings for Go analysis
##################################
# go:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the `prepare` step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step builds the code and extracts information during the build
# process.
# index:
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command:
# - ./compile-all.sh
# Define settings for Java analysis
####################################
# java:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step extracts information from the files in the codebase.
# index:
# Specify Gradle settings.
# gradle:
# Specify the required Gradle version.
# Default: determined automatically.
# version: 4.4
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command: ./compile-all.sh
# Specify the Java version required to build the project.
# java_version: 11
# Specify whether to extract Java .properties files
# Default: false
# properties_files: true
# Specify Maven settings.
# maven:
# Specify the path (absolute or relative) of a Maven settings file to use.
# Default: Maven uses a settings file in the default location, if it exists.
# settings_file: /opt/share/settings.xml
# Specify the path of a Maven toolchains file.
# Default: Maven uses a toolchains file in the default location, if it exists.
# toolchains_file: /opt/share/toolchains.xml
# Specify the required Maven version.
# Default: the Maven version is determined automatically, where feasible.
# version: 3.5.2
# Specify how XML files should be extracted:
# all = extract all XML files.
# default = only extract XML files named `AndroidManifest.xml`, `pom.xml`, and `web.xml`.
# disabled = do not extract any XML files.
# xml_mode: all
# Define settings for JavaScript analysis
##########################################
# javascript:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step extracts information from the files in the codebase.
# index:
# Specify a list of files and folders to extract.
# Default: The project root directory.
# include:
# - src/js
# Specify a list of files and folders to exclude from extraction.
# exclude:
# - thirdparty/lib
# You can add additional file types for LGTM to extract, by mapping file
# extensions (including the leading dot) to file types. The usual
# include/exclude patterns apply, so, for example, `.jsm` files under
# `thirdparty/lib` will not be extracted.
# filetypes:
# ".jsm": "js"
# ".tmpl": "html"
# Specify a list of glob patterns to include/exclude files from extraction; this
# is applied on top of the include/exclude paths from above; patterns are
# processed in the same way as for path classifiers above.
# Default: include all files with known extensions (such as .js, .ts and .html),
# but exclude files ending in `-min.js` or `.min.js` and folders named `node_modules`
# or `bower_components`
# filters:
# exclude any *.ts files anywhere.
# - exclude: "**/*.ts"
# but include *.ts files under src/js/typescript.
# - include: "src/js/typescript/**/*.ts"
# Specify how TypeScript files should be extracted:
# none = exclude all TypeScript files.
# basic = extract syntactic information from TypeScript files.
# full = extract syntactic and type information from TypeScript files.
# Default: full.
# typescript: basic
# By default, LGTM doesn't extract any XML files. You can override this by
# using the `xml_mode` property and setting it to `all`.
# xml_mode: all
# Define settings for Python analysis
######################################
# python:
# # The `prepare` step exists for customization on LGTM.com only.
# # prepare:
# # # The `packages` section is valid for LGTM.com only. It names packages to
# # # be installed.
# # packages: libpng-dev
# # This step is useful for Python analysis where you want to prepare the
# # environment for the `python_setup` step without changing the default behavior
# # for that step.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# # This sets up the Python interpreter and virtual environment, ready for the
# # `index` step to extract the codebase.
# python_setup:
# # Specify packages that should NOT be installed despite being mentioned in the
# # requirements.txt file.
# # Default: no package marked for exclusion.
# exclude_requirements:
# - pywin32
# # Specify a list of pip packages to install.
# # If any of these packages cannot be installed, the extraction will fail.
# requirements:
# - Pillow
# # Specify a list of requirements text files to use to set up the environment,
# # or false for none. Default: any requirements.txt, test-requirements.txt,
# # and similarly named files identified in the codebase are used.
# requirements_files:
# - required-packages.txt
# # Specify a setup.py file to use to set up the environment, or false for none.
# # Default: any setup.py files identified in the codebase are used in preference
# # to any requirements text files.
# setup_py: new-setup.py
# # Override the version of the Python interpreter used for setup and extraction
# # Default: Python 3.
# version: 2
# # Optional step. You should add a `before_index` step if you need to run commands
# # before the `index` step.
# before_index:
# - antlr4 -Dlanguage=Python3 Grammar.g4
# # The `index` step extracts information from the files in the codebase.
# index:
# # Specify a list of files and folders to exclude from extraction.
# # Default: Git submodules and Subversion externals.
# exclude:
# - legacy-implementation
# - thirdparty/libs
# filters:
# - exclude: "**/documentation/examples/snippets/*.py"
# - include: "**/documentation/examples/test_application/*"
# include:
# - example/to/include
...@@ -83,12 +83,18 @@ sudo dnf install -y maven ...@@ -83,12 +83,18 @@ sudo dnf install -y maven
## Get the source codes ## Get the source codes
- github: First of all, you may clone the source codes from github:
```bash ```bash
git clone https://github.com/taosdata/TDengine.git git clone https://github.com/taosdata/TDengine.git
cd TDengine cd TDengine
``` ```
The connectors for go & grafana have been moved to separated repositories,
so you should run this command in the TDengine directory to install them:
```bash
git submodule update --init --recursive
```
## Build TDengine ## Build TDengine
### On Linux platform ### On Linux platform
......
...@@ -11,7 +11,7 @@ TDengine的模块之一是时序数据库。但除此之外,为减少研发的 ...@@ -11,7 +11,7 @@ TDengine的模块之一是时序数据库。但除此之外,为减少研发的
* __全栈时序数据处理引擎__:将数据库、消息队列、缓存、流式计算等功能融为一体,应用无需再集成Kafka/Redis/HBase/Spark/HDFS等软件,大幅降低应用开发和维护的复杂度成本。 * __全栈时序数据处理引擎__:将数据库、消息队列、缓存、流式计算等功能融为一体,应用无需再集成Kafka/Redis/HBase/Spark/HDFS等软件,大幅降低应用开发和维护的复杂度成本。
* __强大的分析功能__:无论是十年前还是一秒钟前的数据,指定时间范围即可查询。数据可在时间轴上或多个设备上进行聚合。即席查询可通过Shell, Python, R, Matlab随时进行。 * __强大的分析功能__:无论是十年前还是一秒钟前的数据,指定时间范围即可查询。数据可在时间轴上或多个设备上进行聚合。即席查询可通过Shell, Python, R, Matlab随时进行。
* __与第三方工具无缝连接__:不用一行代码,即可与Telegraf, Grafana, EMQ, Prometheus, Matlab, R等集成。后续将支持OPC, Hadoop, Spark等, BI工具也将无缝连接。 * __与第三方工具无缝连接__:不用一行代码,即可与Telegraf, Grafana, EMQ, Prometheus, Matlab, R等集成。后续将支持OPC, Hadoop, Spark等, BI工具也将无缝连接。
* __零运维成本、零学习成本__:安装、集群一秒搞定,无需分库分表,实时备份。标准SQL,支持JDBC, RESTful, 支持Python/Java/C/C++/Go, 与MySQL相似,零学习成本。 * __零运维成本、零学习成本__:安装集群简单快捷,无需分库分表,实时备份。类似标准SQL,支持RESTful, 支持Python/Java/C/C++/C#/Go/Node.js, 与MySQL相似,零学习成本。
采用TDengine,可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。但需要指出的是,因充分利用了物联网时序数据的特点,它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM等通用型数据。 采用TDengine,可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。但需要指出的是,因充分利用了物联网时序数据的特点,它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM等通用型数据。
......
...@@ -30,13 +30,13 @@ TDengine软件分为服务器、客户端和报警模块三部分,目前2.0版 ...@@ -30,13 +30,13 @@ TDengine软件分为服务器、客户端和报警模块三部分,目前2.0版
- TDengine-alert-2.0.0-Linux-x64.tar.gz (8.1M) - TDengine-alert-2.0.0-Linux-x64.tar.gz (8.1M)
目前,TDengine只支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装。其他linux系统的支持正在开发中。用`which`命令来检测系统中是否存在`systemd`: 目前,TDengine只支持在使用[`systemd`](https://en.wikipedia.org/wiki/Systemd)做进程服务管理的linux系统上安装。其他linux系统的支持正在开发中。用`which systemctl`命令来检测系统中是否存在`systemd`:
```cmd ```cmd
which systemd which systemctl
``` ```
如果系统中不存在`systemd`命令,请考虑[通过源码安装](#通过源码安装)TDengine。 如果系统中不存在`systemd`,请考虑[通过源码安装](#通过源码安装)TDengine。
具体的安装过程,请参见<a href="https://www.taosdata.com/blog/2019/08/09/566.html">TDengine多种安装包的安装和卸载</a> 具体的安装过程,请参见<a href="https://www.taosdata.com/blog/2019/08/09/566.html">TDengine多种安装包的安装和卸载</a>
......
...@@ -16,13 +16,13 @@ Three different packages are provided, please pick up the one you like. ...@@ -16,13 +16,13 @@ Three different packages are provided, please pick up the one you like.
<li><a id='tdengine-deb' style='color:var(--b2)'>TDengine DEB package (1.7M)</a></li> <li><a id='tdengine-deb' style='color:var(--b2)'>TDengine DEB package (1.7M)</a></li>
<li><a id='tdengine-tar' style='color:var(--b2)'>TDengine Tarball (3.0M)</a></li> <li><a id='tdengine-tar' style='color:var(--b2)'>TDengine Tarball (3.0M)</a></li>
</ul> </ul>
For the time being, TDengine only supports installation on Linux systems using [`systemd`](https://en.wikipedia.org/wiki/Systemd) as the service manager. To check if your system has *systemd*, use the _which_ command. For the time being, TDengine only supports installation on Linux systems using [`systemd`](https://en.wikipedia.org/wiki/Systemd) as the service manager. To check if your system has *systemd* package, use the _which systemctl_ command.
```cmd ```cmd
which systemd which systemctl
``` ```
If the `systemd` command is not found, please [install from source code](#Install-from-Source). If the `systemd` package is not found, please [install from source code](#Install-from-Source).
### Running TDengine ### Running TDengine
......
...@@ -29,23 +29,9 @@ Query OK, 2 row(s) in set (0.001100s) ...@@ -29,23 +29,9 @@ Query OK, 2 row(s) in set (0.001100s)
具体的查询语法请看<a href="https://www.taosdata.com/cn/documentation20/taos-sql/">TAOS SQL </a> 具体的查询语法请看<a href="https://www.taosdata.com/cn/documentation20/taos-sql/">TAOS SQL </a>
## 多表聚合查询 ## 多表聚合查询
物联网场景中,往往同一个类型的数据采集点有多个。TDengine采用超级表(STable)的概念来描述某一个类型的数据采集点,一张普通的表来描述一个具体的数据采集点。同时TDengine使用标签来描述数据采集点的静态属性,一个具体的数据采集点有具体的标签值。通过指定标签的过滤条件,TDengine提供了一高效的方法将超级表(某一类型的数据采集点)所属的子表进行聚合查询。对普通表的聚合函数以及绝大部分操作都适用于超级表,语法完全一样。
TDengine对每个数据采集点单独建表,但在实际应用中经常需要对不同的采集点数据进行聚合。为高效的进行聚合操作,TDengine引入超级表(STable)的概念。超级表用来代表一特定类型的数据采集点,它是包含多张表的表集合,集合里每张表的模式(schema)完全一致,但每张表都带有自己的静态标签,标签可以多个,可以随时增加、删除和修改。 **示例1**:在TAOS Shell,查找北京所有智能电表采集的电压平均值,并按照location分组
应用可通过指定标签的过滤条件,对一个STable下的全部或部分表进行聚合或统计操作,这样大大简化应用的开发。其具体流程如下图所示:
<center> <img src="../assets/stable.png"> </center>
<center> 多表聚合查询原理图 </center>
1:应用将一个查询条件发往系统;2: taosc将超级表的名字发往 Meta Node(管理节点);3:管理节点将超级表所拥有的 vnode 列表发回 taosc;4:taosc将计算的请求连同标签过滤条件发往这些vnode对应的多个数据节点;5:每个vnode先在内存里查找出自己节点里符合标签过滤条件的表的集合,然后扫描存储的时序数据,完成相应的聚合计算,将结果返回给taosc;6:taosc将多个数据节点返回的结果做最后的聚合,将其返回给应用。
由于TDengine在vnode内将标签数据与时序数据分离存储,通过先在内存里过滤标签数据,将需要扫描的数据集大幅减少,大幅提升聚合计算速度。同时,由于数据分布在多个vnode/dnode,聚合计算操作在多个vnode里并发进行,又进一步提升了聚合的速度。
对普通表的聚合函数以及绝大部分操作都适用于超级表,语法完全一样,细节请看 TAOS SQL。
比如:在TAOS Shell,查找所有智能电表采集的电压平均值,并按照location分组
```mysql ```mysql
taos> SELECT AVG(voltage) FROM meters GROUP BY location; taos> SELECT AVG(voltage) FROM meters GROUP BY location;
avg(voltage) | location | avg(voltage) | location |
...@@ -55,6 +41,18 @@ taos> SELECT AVG(voltage) FROM meters GROUP BY location; ...@@ -55,6 +41,18 @@ taos> SELECT AVG(voltage) FROM meters GROUP BY location;
Query OK, 2 row(s) in set (0.002136s) Query OK, 2 row(s) in set (0.002136s)
``` ```
**示例2**:在TAOS shell, 查找groupId为2的所有智能电表过去24小时的记录条数,电流的最大值
```mysql
taos> SELECT count(*), max(current) FROM meters where groupId = 2 and ts > now - 24h;
cunt(*) | max(current) |
==================================
5 | 13.4 |
Query OK, 1 row(s) in set (0.002136s)
```
TDengine仅容许对属于同一个超级表的表之间进行聚合查询,不同超级表之间的聚合查询不支持。在<a href="https://www.taosdata.com/cn/documentation20/taos-sql/">TAOS SQL </a>一章,查询类操作都会注明是否支持超级表。
## 降采样查询、插值 ## 降采样查询、插值
物联网场景里,经常需要通过降采样(down sampling)将采集的数据按时间段进行聚合。TDengine 提供了一个简便的关键词 interval 让按照时间窗口的查询操作变得极为简单。比如,将智能电表 d1001 采集的电流值每10秒钟求和 物联网场景里,经常需要通过降采样(down sampling)将采集的数据按时间段进行聚合。TDengine 提供了一个简便的关键词 interval 让按照时间窗口的查询操作变得极为简单。比如,将智能电表 d1001 采集的电流值每10秒钟求和
...@@ -66,9 +64,9 @@ taos> SELECT sum(current) FROM d1001 INTERVAL(10s); ...@@ -66,9 +64,9 @@ taos> SELECT sum(current) FROM d1001 INTERVAL(10s);
2018-10-03 14:38:10.000 | 24.900000572 | 2018-10-03 14:38:10.000 | 24.900000572 |
Query OK, 2 row(s) in set (0.000883s) Query OK, 2 row(s) in set (0.000883s)
``` ```
降采样操作也适用于超级表,比如:将所有智能电表采集的电流值每秒钟求和 降采样操作也适用于超级表,比如:将北京所有智能电表采集的电流值每秒钟求和
```mysql ```mysql
taos> SELECT SUM(current) FROM meters INTERVAL(1s); taos> SELECT SUM(current) FROM meters where location like "Beijing%" INTERVAL(1s);
ts | sum(current) | ts | sum(current) |
====================================================== ======================================================
2018-10-03 14:38:04.000 | 10.199999809 | 2018-10-03 14:38:04.000 | 10.199999809 |
......
...@@ -124,7 +124,8 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic ...@@ -124,7 +124,8 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
说明: 说明:
1) 表的第一个字段必须是TIMESTAMP,并且系统自动将其设为主键; 1) 表的第一个字段必须是TIMESTAMP,并且系统自动将其设为主键;
2) 表名最大长度为193; 2) 表名最大长度为193;
3) 表的每行长度不能超过16k个字符; 3) 表的每行长度不能超过16k个字符;
4) 子表名只能由字母、数字和下划线组成,且不能以数字开头
5) 使用数据类型binary或nchar,需指定其最长的字节数,如binary(20),表示20字节; 5) 使用数据类型binary或nchar,需指定其最长的字节数,如binary(20),表示20字节;
- **删除数据表** - **删除数据表**
......
...@@ -65,24 +65,24 @@ TDengine 的设计是基于单个硬件、软件系统不可靠,基于任何 ...@@ -65,24 +65,24 @@ TDengine 的设计是基于单个硬件、软件系统不可靠,基于任何
TDengine 分布式架构的逻辑结构图如下: TDengine 分布式架构的逻辑结构图如下:
<center> <img src="../assets/structure.png"> </center> <center> <img src="../assets/structure.png"> </center>
<center> 图 1 TDengine架构示意图 </center> <center> 图 1 TDengine架构示意图 </center>
一个完整的 TDengine 系统是运行在一到多个物理节点上的,逻辑上,它包含数据节点(dnode)、TDengine客户端(taosc)以及应用(app)。系统中存在一到多个数据节点,这些数据节点组成一个集群(cluster)。应用通过taosc的API与TDengine集群进行互动。下面对每个逻辑单元进行简要介绍。 一个完整的 TDengine 系统是运行在一到多个物理节点上的,逻辑上,它包含数据节点(dnode)、TDengine应用驱动(taosc)以及应用(app)。系统中存在一到多个数据节点,这些数据节点组成一个集群(cluster)。应用通过taosc的API与TDengine集群进行互动。下面对每个逻辑单元进行简要介绍。
**物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机,可以是安装有OS的物理机、虚拟机或容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。TDengine完全依赖FQDN来进行网络通讯,如果不了解FQDN,请看博文《[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)》。 **物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机,可以是安装有OS的物理机、虚拟机或Docker容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。TDengine完全依赖FQDN来进行网络通讯,如果不了解FQDN,请看博文《[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)》。
**数据节点(dnode):** dnode 是 TDengine 服务器侧执行代码 taosd 在物理节点上的一个运行实例,一个工作的系统必须有至少一个数据节点。dnode包含零到多个逻辑的虚拟节点(VNODE),零或者至多一个逻辑的管理节点(mnode)。dnode在系统中的唯一标识由实例的End Point (EP )决定。EP是dnode所在物理节点的FQDN (Fully Qualified Domain Name)和系统所配置的网络端口号(Port)的组合。通过配置不同的端口,一个物理节点(一台物理机、虚拟机或容器)可以运行多个实例,或有多个数据节点。 **数据节点(dnode):** dnode 是 TDengine 服务器侧执行代码 taosd 在物理节点上的一个运行实例,一个工作的系统必须有至少一个数据节点。dnode包含零到多个逻辑的虚拟节点(VNODE),零或者至多一个逻辑的管理节点(mnode)。dnode在系统中的唯一标识由实例的End Point (EP )决定。EP是dnode所在物理节点的FQDN (Fully Qualified Domain Name)和系统所配置的网络端口号(Port)的组合。通过配置不同的端口,一个物理节点(一台物理机、虚拟机或容器)可以运行多个实例,或有多个数据节点。
**虚拟节点(vnode)**: 为更好的支持数据分片、负载均衡,防止数据过热或倾斜,数据节点被虚拟化成多个虚拟节点(vnode,图中V2, V3, V4等)。每个 vnode 都是一个相对独立的工作单元,是时序数据存储的基本单元,具有独立的运行线程、内存空间与持久化存储的路径。一个 vnode 包含一定数量的表(数据采集点)。当创建一张新表时,系统会检查是否需要创建新的 vnode。一个数据节点上能创建的 vnode 的数量取决于该数据节点所在物理节点的硬件资源。一个 vnode 只属于一个DB,但一个DB可以有多个 vnode。一个 vnode 除存储的时序数据外,也保存有所包含的表的SCHEMA、标签值等。一个虚拟节点由所属的数据节点的EP,以及所属的VGroup ID在系统内唯一标识,由管理节点创建并管理。 **虚拟节点(vnode)**: 为更好的支持数据分片、负载均衡,防止数据过热或倾斜,数据节点被虚拟化成多个虚拟节点(vnode,图中V2, V3, V4等)。每个 vnode 都是一个相对独立的工作单元,是时序数据存储的基本单元,具有独立的运行线程、内存空间与持久化存储的路径。一个 vnode 包含一定数量的表(数据采集点)。当创建一张新表时,系统会检查是否需要创建新的 vnode。一个数据节点上能创建的 vnode 的数量取决于该数据节点所在物理节点的硬件资源。一个 vnode 只属于一个DB,但一个DB可以有多个 vnode。一个 vnode 除存储的时序数据外,也保存有所包含的表的schema、标签值等。一个虚拟节点由所属的数据节点的EP,以及所属的VGroup ID在系统内唯一标识,由管理节点创建并管理。
**管理节点(mnode):** 一个虚拟的逻辑单元,负责所有数据节点运行状态的监控和维护,以及节点之间的负载均衡(图中M)。同时,管理节点也负责元数据(包括用户、数据库、表、静态标签等)的存储和管理,因此也称为 Meta Node。TDengine 集群中可配置多个(最多不超过5个) mnode,它们自动构建成为一个虚拟管理节点组(图中M0, M1, M2)。mnode 间采用 master/slave 的机制进行管理,而且采取强一致方式进行数据同步, 任何数据更新操作只能在 Master 上进行。mnode 集群的创建由系统自动完成,无需人工干预。每个dnode上至多有一个mnode,由所属的数据节点的EP来唯一标识。每个dnode通过内部消息交互自动获取整个集群中所有 mnode 所在的 dnode 的EP。 **管理节点(mnode):** 一个虚拟的逻辑单元,负责所有数据节点运行状态的监控和维护,以及节点之间的负载均衡(图中M)。同时,管理节点也负责元数据(包括用户、数据库、表、静态标签等)的存储和管理,因此也称为 Meta Node。TDengine 集群中可配置多个(最多不超过5个) mnode,它们自动构建成为一个虚拟管理节点组(图中M0, M1, M2)。mnode 间采用 master/slave 的机制进行管理,而且采取强一致方式进行数据同步, 任何数据更新操作只能在 Master 上进行。mnode 集群的创建由系统自动完成,无需人工干预。每个dnode上至多有一个mnode,由所属的数据节点的EP来唯一标识。每个dnode通过内部消息交互自动获取整个集群中所有 mnode 所在的 dnode 的EP。
**虚拟节点组(VGroup):** 不同数据节点上的 vnode 可以组成一个虚拟节点组(vnode group)来保证系统的高可靠。虚拟节点组内采取master/slave的方式进行管理。写操作只能在 master vnode 上进行,系统采用异步复制的方式将数据同步到 slave vnode,这样确保了一份数据在多个物理节点上有拷贝。一个 vgroup 里虚拟节点个数就是数据的副本数。如果一个DB的副本数为N,系统必须有至少N个数据节点。副本数在创建DB时通过参数 replica 可以指定,缺省为1。使用 TDengine 的多副本特性,可以不再需要昂贵的磁盘阵列等存储设备,就可以获得同样的数据高可靠性。虚拟节点组由管理节点创建、管理,并且由管理节点分配一个系统唯一的ID,VGroup ID。如果两个虚拟节点的vnode group ID相同,说明他们属于同一个组,数据互为备份。虚拟节点组里虚拟节点的个数是可以动态改变的,容许只有一个,也就是没有数据复制。VGroup ID是永远不变的,即使一个虚拟节点组被删除,它的ID也不会被收回重复利用。 **虚拟节点组(VGroup):** 不同数据节点上的 vnode 可以组成一个虚拟节点组(vnode group)来保证系统的高可靠。虚拟节点组内采取master/slave的方式进行管理。写操作只能在 master vnode 上进行,系统采用异步复制的方式将数据同步到 slave vnode,这样确保了一份数据在多个物理节点上有拷贝。一个 vgroup 里虚拟节点个数就是数据的副本数。如果一个DB的副本数为N,系统必须有至少N个数据节点。副本数在创建DB时通过参数 replica 可以指定,缺省为1。使用 TDengine 的多副本特性,可以不再需要昂贵的磁盘阵列等存储设备,就可以获得同样的数据高可靠性。虚拟节点组由管理节点创建、管理,并且由管理节点分配一个系统唯一的ID,VGroup ID。如果两个虚拟节点的vnode group ID相同,说明他们属于同一个组,数据互为备份。虚拟节点组里虚拟节点的个数是可以动态改变的,容许只有一个,也就是没有数据复制。VGroup ID是永远不变的,即使一个虚拟节点组被删除,它的ID也不会被收回重复利用。
**TAOSC:** taosc是TDengine给应用提供的驱动程序(driver),负责处理应用与集群的接口交互,内嵌于JDBC、ODBC driver中,或者C、Python、Go语言连接库里。应用都是通过taosc而不是直接连接集群中的数据节点与整个集群进行交互的。这个模块负责获取并缓存元数据;将插入、查询等请求转发到正确的数据节点;在把结果返回给应用时,还需要负责最后一级的聚合、排序、过滤等操作。对于JDBC, ODBC, C/C++接口而言,这个模块是在应用所处的物理节点上运行,但消耗的资源很小。同时,为支持全分布式的RESTful接口,taosc在TDengine集群的每个dnode上都有一运行实例。 **TAOSC:** taosc是TDengine给应用提供的驱动程序(driver),负责处理应用与集群的接口交互,提供C/C++语言原生接口,内嵌于JDBC、C#、Python、Go、Node.js语言连接库里。应用都是通过taosc而不是直接连接集群中的数据节点与整个集群进行交互的。这个模块负责获取并缓存元数据;将插入、查询等请求转发到正确的数据节点;在把结果返回给应用时,还需要负责最后一级的聚合、排序、过滤等操作。对于JDBC, C/C++/C#/Python/Go/Node.js接口而言,这个模块是在应用所处的物理节点上运行。同时,为支持全分布式的RESTful接口,taosc在TDengine集群的每个dnode上都有一运行实例。
### 节点之间的通讯 ### 节点之间的通讯
**通讯方式:**TDengine系统的各个节点之间的通讯是通过TCP/UDP进行的。因为考虑到物联网场景,数据写入的包一般不大,因此TDengine 除采用TCP做传输之外,还采用UDP方式,因为UDP 更加高效,而且不受连接数的限制。TDengine实现了自己的超时、重传、确认等机制,以确保UDP的可靠传输。对于数据量不到15K的数据包,采取UDP的方式进行传输,超过15K的,或者是查询类的操作,自动采取TCP的方式进行传输。同时,TDengine根据配置和数据包,会自动对数据进行压缩/解压缩,数字签名/认证等处理。对于数据节点之间的数据复制,只采用TCP方式进行数据传输。 **通讯方式:**TDengine系统的各个数据节点之间,以及应用驱动与各数据节点之间的通讯是通过TCP/UDP进行的。因为考虑到物联网场景,数据写入的包一般不大,因此TDengine 除采用TCP做传输之外,还采用UDP方式,因为UDP 更加高效,而且不受连接数的限制。TDengine实现了自己的超时、重传、确认等机制,以确保UDP的可靠传输。对于数据量不到15K的数据包,采取UDP的方式进行传输,超过15K的,或者是查询类的操作,自动采取TCP的方式进行传输。同时,TDengine根据配置和数据包,会自动对数据进行压缩/解压缩,数字签名/认证等处理。对于数据节点之间的数据复制,只采用TCP方式进行数据传输。
**FQDN配置**:一个数据节点有一个或多个FQDN,可以在系统配置文件taos.cfg通过参数“fqdn"进行指定,如果没有指定,系统将自动获取FQDN。如果节点没有配置FQDN,可以直接将该节点的配置参数fqdn设置为它的IP地址。但不建议使用IP,因为IP地址可变,一旦变化,将让集群无法正常工作。一个数据节点的EP(End Point)由FQDN + Port组成。采用FQDN,需要保证DNS服务正常工作,或者在节点以及应用所在的节点配置好hosts文件。 **FQDN配置**:一个数据节点有一个或多个FQDN,可以在系统配置文件taos.cfg通过参数“fqdn"进行指定,如果没有指定,系统将自动获取计算机的hostname作为其FQDN。如果节点没有配置FQDN,可以直接将该节点的配置参数fqdn设置为它的IP地址。但不建议使用IP,因为IP地址可变,一旦变化,将让集群无法正常工作。一个数据节点的EP(End Point)由FQDN + Port组成。采用FQDN,需要保证DNS服务正常工作,或者在节点以及应用所在的节点配置好hosts文件。
**端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定,对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口,是serverPort+10. 为支持多线程高效的处理UDP数据,每个对内和对外的UDP连接,都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10,总共11个TCP/UDP端口。使用时,需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。 **端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定,对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口,是serverPort+10. 为支持多线程高效的处理UDP数据,每个对内和对外的UDP连接,都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10,总共11个TCP/UDP端口。使用时,需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。
...@@ -153,6 +153,7 @@ TDengine除vnode分片之外,还对时序数据按照时间段进行分区。 ...@@ -153,6 +153,7 @@ TDengine除vnode分片之外,还对时序数据按照时间段进行分区。
当新的数据节点被添加进集群,因为新的计算和存储被添加进来,系统也将自动启动负载均衡流程。 当新的数据节点被添加进集群,因为新的计算和存储被添加进来,系统也将自动启动负载均衡流程。
负载均衡过程无需任何人工干预,应用也无需重启,将自动连接新的节点,完全透明。 负载均衡过程无需任何人工干预,应用也无需重启,将自动连接新的节点,完全透明。
**提示:负载均衡由参数balance控制,决定开启/关闭自动负载均衡。**
## 数据写入与复制流程 ## 数据写入与复制流程
如果一个数据库有N个副本,那一个虚拟节点组就有N个虚拟节点,但是只有一个是Master,其他都是slave。当应用将新的记录写入系统时,只有Master vnode能接受写的请求。如果slave vnode收到写的请求,系统将通知taosc需要重新定向。 如果一个数据库有N个副本,那一个虚拟节点组就有N个虚拟节点,但是只有一个是Master,其他都是slave。当应用将新的记录写入系统时,只有Master vnode能接受写的请求。如果slave vnode收到写的请求,系统将通知taosc需要重新定向。
...@@ -192,7 +193,8 @@ Master Vnode遵循下面的写入流程: ...@@ -192,7 +193,8 @@ Master Vnode遵循下面的写入流程:
理论上,只要是异步复制,就无法保证100%不丢失。但是这个窗口极小,mater与slave要同时发生故障,而且发生在刚给应用确认写入成功之后。 理论上,只要是异步复制,就无法保证100%不丢失。但是这个窗口极小,mater与slave要同时发生故障,而且发生在刚给应用确认写入成功之后。
注:异地容灾、IDC无中断迁移,仅仅企业版支持 注:异地容灾、IDC无中断迁移,仅仅企业版支持。
**提示:该功能暂未提供**
### 主从选择 ### 主从选择
Vnode会保持一个数据版本号(Version),对内存数据进行持久化存储时,对该版本号也进行持久化存储。每个数据更新操作,无论是采集的时序数据还是元数据,这个版本号将增一。 Vnode会保持一个数据版本号(Version),对内存数据进行持久化存储时,对该版本号也进行持久化存储。每个数据更新操作,无论是采集的时序数据还是元数据,这个版本号将增一。
...@@ -259,6 +261,7 @@ dataDir /mnt/disk6/taos 2 ...@@ -259,6 +261,7 @@ dataDir /mnt/disk6/taos 2
挂载的盘也可以是非本地的网络盘,只要系统能访问即可。 挂载的盘也可以是非本地的网络盘,只要系统能访问即可。
注:多级存储功能仅企业版支持 注:多级存储功能仅企业版支持
**提示:该功能暂未提供**
## 数据查询 ## 数据查询
TDengine提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine的查询处理需要客户端、vnode, mnode节点协同完成。 TDengine提供了多种多样针对表和超级表的查询处理功能,除了常规的聚合查询之外,还提供针对时序数据的窗口查询、统计聚合等功能。TDengine的查询处理需要客户端、vnode, mnode节点协同完成。
...@@ -289,11 +292,18 @@ select count(*) from d1001 interval(1h) fill(prev); ...@@ -289,11 +292,18 @@ select count(*) from d1001 interval(1h) fill(prev);
针对d1001设备采集数据统计每小时记录数,如果某一个小时不存在数据,这返回之前一个小时的统计数据。TDengine提供前向插值(prev)、线性插值(linear)、NULL值填充(NULL)、特定值填充(value)。 针对d1001设备采集数据统计每小时记录数,如果某一个小时不存在数据,这返回之前一个小时的统计数据。TDengine提供前向插值(prev)、线性插值(linear)、NULL值填充(NULL)、特定值填充(value)。
### 多表聚合查询 ### 多表聚合查询
多表聚合查询与单表查询的整体流程相同,但是存在如下的差异: TDengine对每个数据采集点单独建表,但在实际应用中经常需要对不同的采集点数据进行聚合。为高效的进行聚合操作,TDengine引入超级表(STable)的概念。超级表用来代表一特定类型的数据采集点,它是包含多张表的表集合,集合里每张表的模式(schema)完全一致,但每张表都带有自己的静态标签,标签可以多个,可以随时增加、删除和修改。 应用可通过指定标签的过滤条件,对一个STable下的全部或部分表进行聚合或统计操作,这样大大简化应用的开发。其具体流程如下图所示:
<center> <img src="../assets/multi_tables.png"> </center>
- 由于多表可能分布在不同的节点(dnode),因此多表的聚合查询需要首先获得表所在的全部数据节点的信息,并且同时向相关的dnode发出查询请求。
- 每个vnode的计算获得的中间结果(partial results)需要进行第二阶段的聚合才能形成最终结果,第二阶段的聚合过程在客户端完成。 <center> 图 5 多表聚合查询原理图 </center>
- 由于表标签信息存储在vnode中,因此针对标签信息的查询也需要vnode完成。客户端将标签的过滤表达式封装在查询请求结构体中发送给vnode,由vnode的查询执行线程从中抽取出标签查询条件,然后执行查询。标签查询与过滤是在针对表的查询之前完成。标签查询完成以后,将符合条件的表纳入到接下来的查询处理流程中。 1:应用将一个查询条件发往系统;
2: taosc将超级表的名字发往 Meta Node(管理节点);
3:管理节点将超级表所拥有的 vnode 列表发回 taosc;
4:taosc将计算的请求连同标签过滤条件发往这些vnode对应的多个数据节点;
5:每个vnode先在内存里查找出自己节点里符合标签过滤条件的表的集合,然后扫描存储的时序数据,完成相应的聚合计算,将结果返回给taosc;
6:taosc将多个数据节点返回的结果做最后的聚合,将其返回给应用。
由于TDengine在vnode内将标签数据与时序数据分离存储,通过在内存里过滤标签数据,先找到需要参与聚合操作的表的集合,将需要扫描的数据集大幅减少,大幅提升聚合计算速度。同时,由于数据分布在多个vnode/dnode,聚合计算操作在多个vnode里并发进行,又进一步提升了聚合的速度。 对普通表的聚合函数以及绝大部分操作都适用于超级表,语法完全一样,细节请看 TAOS SQL。
### 预计算 ### 预计算
为有效提升查询处理的性能,针对物联网数据的不可更改的特点,在数据块头部记录该数据块中存储数据的统计信息:包括最大值、最小值、和。我们称之为预计算单元。如果查询处理涉及整个数据块的全部数据,直接使用预计算结果,完全不需要读取数据块的内容。由于预计算数据量远小于磁盘上存储的数据块数据的大小,对于磁盘IO为瓶颈的查询处理,使用预计算结果可以极大地减小读取IO压力,加速查询处理的流程。预计算机制与Postgre SQL的索引BRIN(block range index)有异曲同工之妙。 为有效提升查询处理的性能,针对物联网数据的不可更改的特点,在数据块头部记录该数据块中存储数据的统计信息:包括最大值、最小值、和。我们称之为预计算单元。如果查询处理涉及整个数据块的全部数据,直接使用预计算结果,完全不需要读取数据块的内容。由于预计算数据量远小于磁盘上存储的数据块数据的大小,对于磁盘IO为瓶颈的查询处理,使用预计算结果可以极大地减小读取IO压力,加速查询处理的流程。预计算机制与Postgre SQL的索引BRIN(block range index)有异曲同工之妙。
...@@ -8,7 +8,7 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预 ...@@ -8,7 +8,7 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预
## 准备工作 ## 准备工作
**第零步**如果没有部署DNS服务,请规划集群所有物理节点的FQDN,然后按照《[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)》里的步骤,将所有集群物理节点的IP与FQDN的对应关系添加好。 **第零步**规划集群所有物理节点的FQDN,将规划好的FQDN分别添加到每个物理节点的/etc/hostname;修改每个物理节点的/etc/hosts,将所有集群物理节点的IP与FQDN的对应添加好。【如部署了DNS,请联系网络管理员在DNS上做好相关配置】
**第一步**:如果搭建集群的物理节点中,存有之前的测试数据、装过1.X的版本,或者装过其他版本的TDengine,请先将其删除,并清空所有数据,具体步骤请参考博客[《TDengine多种安装包的安装和卸载》](https://www.taosdata.com/blog/2019/08/09/566.html ) **第一步**:如果搭建集群的物理节点中,存有之前的测试数据、装过1.X的版本,或者装过其他版本的TDengine,请先将其删除,并清空所有数据,具体步骤请参考博客[《TDengine多种安装包的安装和卸载》](https://www.taosdata.com/blog/2019/08/09/566.html )
**注意1:**因为FQDN的信息会写进文件,如果之前没有配置或者更改FQDN,且启动了TDengine。请一定在确保数据无用或者备份的前提下,清理一下之前的数据(rm -rf /var/lib/taos/); **注意1:**因为FQDN的信息会写进文件,如果之前没有配置或者更改FQDN,且启动了TDengine。请一定在确保数据无用或者备份的前提下,清理一下之前的数据(rm -rf /var/lib/taos/);
...@@ -16,9 +16,9 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预 ...@@ -16,9 +16,9 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预
**第二步**:建议关闭所有物理节点的防火墙,至少保证端口:6030 - 6042的TCP和UDP端口都是开放的。**强烈建议**先关闭防火墙,集群搭建完毕之后,再来配置端口; **第二步**:建议关闭所有物理节点的防火墙,至少保证端口:6030 - 6042的TCP和UDP端口都是开放的。**强烈建议**先关闭防火墙,集群搭建完毕之后,再来配置端口;
**第三步**:在所有节点安装TDengine,且版本必须是一致的,**但不要启动taosd**。安装时,提示输入是否要加入一个已经存在的TDengine集群时,第一个物理节点直接回车创建新集群,后续物理节点则输入该集群任何一个在线的物理节点的FQDN:端口号(默认6030); **第三步**:在所有物理节点安装TDengine,且版本必须是一致的,**但不要启动taosd**。安装时,提示输入是否要加入一个已经存在的TDengine集群时,第一个物理节点直接回车创建新集群,后续物理节点则输入该集群任何一个在线的物理节点的FQDN:端口号(默认6030);
**第四步**:检查所有数据节点,以及应用所在物理节点的网络设置: **第四步**:检查所有数据节点,以及应用程序所在物理节点的网络设置:
1. 每个物理节点上执行命令`hostname -f`,查看和确认所有节点的hostname是不相同的(应用驱动所在节点无需做此项检查); 1. 每个物理节点上执行命令`hostname -f`,查看和确认所有节点的hostname是不相同的(应用驱动所在节点无需做此项检查);
2. 每个物理节点上执行`ping host`, 其中host是其他物理节点的hostname, 看能否ping通其它物理节点; 如果不能ping通,需要检查网络设置, 或/etc/hosts文件(Windows系统默认路径为C:\Windows\system32\drivers\etc\hosts),或DNS的配置。如果无法ping通,是无法组成集群的; 2. 每个物理节点上执行`ping host`, 其中host是其他物理节点的hostname, 看能否ping通其它物理节点; 如果不能ping通,需要检查网络设置, 或/etc/hosts文件(Windows系统默认路径为C:\Windows\system32\drivers\etc\hosts),或DNS的配置。如果无法ping通,是无法组成集群的;
......
...@@ -22,7 +22,7 @@ INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, ...@@ -22,7 +22,7 @@ INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6,
**Tips:** **Tips:**
- 要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过16K,一条SQL语句总长度不能超过64K(可通过参数maxSQLLength配置,最大可配置为8M)。 - 要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过16K,一条SQL语句总长度不能超过64K(可通过参数maxSQLLength配置,最大可配置为1M)。
- TDengine支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开20个以上的线程同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程切频繁切换,带来额外开销。 - TDengine支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开20个以上的线程同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程切频繁切换,带来额外开销。
- 对同一张表,如果新插入记录的时间戳已经存在,新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。如果应用自动生成记录,很有可能生成的时间戳是一样的,这样,成功插入的记录条数会小于应用插入的记录条数。 - 对同一张表,如果新插入记录的时间戳已经存在,新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。如果应用自动生成记录,很有可能生成的时间戳是一样的,这样,成功插入的记录条数会小于应用插入的记录条数。
- 写入的数据的时间戳必须大于当前时间减去配置参数keep的时间。如果keep配置为3650天,那么无法写入比3650天还老的数据。写入数据的时间戳也不能大于当前时间加配置参数days。如果days配置为2,那么无法写入比当前时间还晚2天的数据。 - 写入的数据的时间戳必须大于当前时间减去配置参数keep的时间。如果keep配置为3650天,那么无法写入比3650天还老的数据。写入数据的时间戳也不能大于当前时间加配置参数days。如果days配置为2,那么无法写入比当前时间还晚2天的数据。
......
Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
...@@ -468,8 +468,24 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { ...@@ -468,8 +468,24 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
if (!mnodeIsRunning()) { if (!mnodeIsRunning()) {
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) { if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
dInfo("mnode index:%d %s:%u should work as master", i, pEpSet->fqdn[i], pEpSet->port[i]); dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]);
sdbUpdateSync(); bool find = false;
for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) {
if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) {
dInfo("localEp found in mnode infos");
find = true;
break;
}
}
if (!find) {
dInfo("localEp not found in mnode infos, will set into mnode infos");
tstrncpy(tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeEp, tsLocalEp, TSDB_EP_LEN);
tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeId = dnodeGetDnodeId();
tsDMnodeInfos.nodeNum++;
}
dnodeStartMnode();
} }
} }
} }
......
...@@ -146,19 +146,16 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { ...@@ -146,19 +146,16 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
} }
} }
bool dnodeCheckMnodeStarting() { bool dnodeStartMnode() {
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) return false; if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
dDebug("mnode module is already started, module status:%d", tsModuleStatus);
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); return false;
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
if (node->nodeId == dnodeGetDnodeId()) {
uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
dnodeProcessModuleStatus(moduleStatus);
return true;
}
} }
return false; uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
dnodeProcessModuleStatus(moduleStatus);
sdbUpdateSync();
return true;
} }
...@@ -187,6 +187,7 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { ...@@ -187,6 +187,7 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
} }
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode); vnodeRelease(pVnode);
return; return;
} }
......
...@@ -43,7 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet); ...@@ -43,7 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet); void dnodeGetMnodeEpSetForShell(void *epSet);
void * dnodeGetMnodeInfos(); void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
bool dnodeCheckMnodeStarting(); bool dnodeStartMnode();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
......
...@@ -91,7 +91,6 @@ typedef struct { ...@@ -91,7 +91,6 @@ typedef struct {
} SSdbWriteWorkerPool; } SSdbWriteWorkerPool;
extern void * tsMnodeTmr; extern void * tsMnodeTmr;
static void * tsUpdateSyncTmr;
static SSdbObject tsSdbObj = {0}; static SSdbObject tsSdbObj = {0};
static taos_qset tsSdbWriteQset; static taos_qset tsSdbWriteQset;
static taos_qall tsSdbWriteQall; static taos_qall tsSdbWriteQall;
...@@ -298,16 +297,12 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { ...@@ -298,16 +297,12 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
taosFreeQitem(pOper); taosFreeQitem(pOper);
} }
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(); }
void sdbUpdateSync() { void sdbUpdateSync() {
if (!mnodeIsRunning()) { if (!mnodeIsRunning()) {
mDebug("mnode not start yet, update sync info later"); mDebug("mnode not start yet, update sync info later");
if (dnodeCheckMnodeStarting()) {
taosTmrReset(sdbUpdateSyncTmrFp, 1000, NULL, tsMnodeTmr, &tsUpdateSyncTmr);
}
return; return;
} }
mDebug("update sync info in sdb"); mDebug("update sync info in sdb");
SSyncCfg syncCfg = {0}; SSyncCfg syncCfg = {0};
......
...@@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); ...@@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn); static void rpcSendReqHead(SRpcConn *pConn);
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext);
static void rpcProcessConnError(void *param, void *id); static void rpcProcessConnError(void *param, void *id);
static void rpcProcessRetryTimer(void *, void *); static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessIdleTimer(void *param, void *tmrId);
...@@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) { ...@@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) {
tError("failed to malloc msg, size:%d", size); tError("failed to malloc msg, size:%d", size);
return NULL; return NULL;
} else { } else {
tDebug("malloc msg: %p", start); tTrace("malloc mem: %p", start);
} }
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
...@@ -333,7 +333,7 @@ void rpcFreeCont(void *cont) { ...@@ -333,7 +333,7 @@ void rpcFreeCont(void *cont) {
if (cont) { if (cont) {
char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext); char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
free(temp); free(temp);
tDebug("free mem: %p", temp); tTrace("free mem: %p", temp);
} }
} }
...@@ -553,7 +553,7 @@ static void rpcFreeMsg(void *msg) { ...@@ -553,7 +553,7 @@ static void rpcFreeMsg(void *msg) {
if ( msg ) { if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext); char *temp = (char *)msg - sizeof(SRpcReqContext);
free(temp); free(temp);
tDebug("free msg: %p", temp); tTrace("free mem: %p", temp);
} }
} }
...@@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
} }
if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
tDebug("%s, message body is empty, ignore", pConn->info);
return TSDB_CODE_RPC_APP_ERROR;
}
pConn->inTranId = pHead->tranId; pConn->inTranId = pHead->tranId;
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
// start the progress timer to monitor the response from server app
if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);
return 0; return 0;
} }
...@@ -881,17 +890,32 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -881,17 +890,32 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
pConn->outType = 0; pConn->outType = 0;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0; pConn->reqMsgLen = 0;
SRpcReqContext *pContext = pConn->pContext;
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY;
} else {
pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
tWarn("%s, too many redirects, quit", pConn->info);
}
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
int32_t sid; int32_t sid;
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
sid = htonl(pHead->destId); sid = htonl(pHead->destId);
*ppContext = NULL;
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
...@@ -945,6 +969,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -945,6 +969,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
} else { } else {
terrno = rpcProcessRspHead(pConn, pHead); terrno = rpcProcessRspHead(pConn, pHead);
*ppContext = pConn->pContext;
} }
} }
...@@ -1009,7 +1034,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -1009,7 +1034,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
} }
terrno = 0; terrno = 0;
pConn = rpcProcessMsgHead(pRpc, pRecv); SRpcReqContext *pContext;
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) {
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
...@@ -1029,7 +1055,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -1029,7 +1055,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
} }
} else { // msg is passed to app only parsing is ok } else { // msg is passed to app only parsing is ok
rpcProcessIncomingMsg(pConn, pHead); rpcProcessIncomingMsg(pConn, pHead, pContext);
} }
} }
...@@ -1060,7 +1086,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1060,7 +1086,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
rpcFreeCont(pContext->pCont); rpcFreeCont(pContext->pCont);
} }
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -1070,29 +1096,18 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1070,29 +1096,18 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
if (rpcMsg.contLen > 0) { rpcMsg.ahandle = pConn->ahandle;
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests rpcAddRef(pRpc); // add the refCount for requests
// start the progress timer to monitor the response from server app // notify the server app
if (pConn->connType != RPC_CONN_TCPS) (*(pRpc->cfp))(&rpcMsg, NULL);
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
// notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL);
} else {
tDebug("%s, message body is empty, ignore", pConn->info);
rpcFreeCont(rpcMsg.pCont);
}
} else { } else {
// it's a response // it's a response
SRpcReqContext *pContext = pConn->pContext;
rpcMsg.handle = pContext; rpcMsg.handle = pContext;
pConn->pContext = NULL; rpcMsg.ahandle = pContext->ahandle;
pConn->pReqMsg = NULL;
// for UDP, port may be changed by server, the port in epSet shall be used for cache // for UDP, port may be changed by server, the port in epSet shall be used for cache
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
...@@ -1101,19 +1116,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1101,19 +1116,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcMsg.contLen < sizeof(SRpcEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY;
} else {
pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
tWarn("%s, too many redirects, quit", pConn->info);
}
}
}
if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
pContext->numOfTry = 0; pContext->numOfTry = 0;
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
...@@ -1445,7 +1447,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { ...@@ -1445,7 +1447,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
pNewHead->msgLen = rpcMsgLenFromCont(origLen); pNewHead->msgLen = rpcMsgLenFromCont(origLen);
rpcFreeMsg(pHead); // free the compressed message buffer rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead; pHead = pNewHead;
//tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); tTrace("decomp malloc mem: %p", temp);
} else { } else {
tError("failed to allocate memory to decompress msg, contLen:%d", contLen); tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
} }
......
...@@ -62,7 +62,7 @@ typedef struct { ...@@ -62,7 +62,7 @@ typedef struct {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
int numOfThreads; int numOfThreads;
void * shandle; void * shandle;
SThreadObj *pThreadObj; SThreadObj **pThreadObj;
pthread_t thread; pthread_t thread;
} SServerObj; } SServerObj;
...@@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
pServerObj->numOfThreads = numOfThreads; pServerObj->numOfThreads = numOfThreads;
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads);
if (pServerObj->pThreadObj == NULL) { if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
// initialize parameters in case it may encounter error later // initialize parameters in case it may encounter error later
pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1);
if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]);
free(pServerObj->pThreadObj);
free(pServerObj);
return NULL;
}
pServerObj->pThreadObj[i] = pThreadObj;
pThreadObj->pollFd = -1; pThreadObj->pollFd = -1;
taosResetPthread(&pThreadObj->thread); taosResetPthread(&pThreadObj->thread);
pThreadObj->processData = fp; pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
pThreadObj++;
} }
// initialize mutex, thread, fd which may fail // initialize mutex, thread, fd which may fail
pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj[i];
code = pthread_mutex_init(&(pThreadObj->mutex), NULL); code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
if (code < 0) { if (code < 0) {
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
...@@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
pThreadObj->threadId = i; pThreadObj->threadId = i;
pThreadObj++;
} }
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
...@@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { ...@@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true; pThreadObj->stop = true;
eventfd_t fd = -1; eventfd_t fd = -1;
if (taosComparePthread(pThreadObj->thread, pthread_self())) {
pthread_detach(pthread_self());
return;
}
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
// signal the thread to stop, try graceful method first, // signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed // and use pthread_cancel when failed
...@@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { ...@@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
} }
} }
if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); pthread_join(pThreadObj->thread, NULL);
if (fd != -1) taosCloseSocket(fd);
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
} }
if (fd != -1) taosCloseSocket(fd);
} }
void taosStopTcpServer(void *handle) { void taosStopTcpServer(void *handle) {
...@@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) { ...@@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) {
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL);
if (taosCheckPthreadValid(pServerObj->thread)) {
if (taosComparePthread(pServerObj->thread, pthread_self())) {
pthread_detach(pthread_self());
} else {
pthread_join(pServerObj->thread, NULL);
}
}
tDebug("%s TCP server is stopped", pServerObj->label); tDebug("%s TCP server is stopped", pServerObj->label);
} }
...@@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) {
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
for (int i = 0; i < pServerObj->numOfThreads; ++i) { for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj[i];
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
pthread_mutex_destroy(&(pThreadObj->mutex));
} }
tDebug("%s TCP server is cleaned up", pServerObj->label); tDebug("%s TCP server is cleaned up", pServerObj->label);
...@@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) {
taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
// pick up the thread to handle this connection // pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj + threadId; pThreadObj = pServerObj->pThreadObj[threadId];
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
if (pFdObj) { if (pFdObj) {
...@@ -327,10 +342,8 @@ void taosCleanUpTcpClient(void *chandle) { ...@@ -327,10 +342,8 @@ void taosCleanUpTcpClient(void *chandle) {
SThreadObj *pThreadObj = chandle; SThreadObj *pThreadObj = chandle;
if (pThreadObj == NULL) return; if (pThreadObj == NULL) return;
tDebug ("%s TCP client will be cleaned up", pThreadObj->label);
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
tDebug ("%s TCP client is cleaned up", pThreadObj->label);
taosTFree(pThreadObj);
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
...@@ -365,7 +378,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -365,7 +378,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
void taosCloseTcpConnection(void *chandle) { void taosCloseTcpConnection(void *chandle) {
SFdObj *pFdObj = chandle; SFdObj *pFdObj = chandle;
if (pFdObj == NULL) return; if (pFdObj == NULL || pFdObj->signature != pFdObj) return;
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj);
...@@ -378,7 +391,7 @@ void taosCloseTcpConnection(void *chandle) { ...@@ -378,7 +391,7 @@ void taosCloseTcpConnection(void *chandle) {
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
SFdObj *pFdObj = chandle; SFdObj *pFdObj = chandle;
if (chandle == NULL) return -1; if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1;
return taosWriteMsg(pFdObj->fd, data, len); return taosWriteMsg(pFdObj->fd, data, len);
} }
...@@ -425,7 +438,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -425,7 +438,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1; return -1;
} else { } else {
tDebug("TCP malloc mem: %p", buffer); tTrace("TCP malloc mem: %p", buffer);
} }
msg = buffer + tsRpcOverhead; msg = buffer + tsRpcOverhead;
...@@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) { ...@@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) {
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
} }
if (pThreadObj->stop) break;
}
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
} }
pthread_mutex_destroy(&(pThreadObj->mutex));
tDebug("%s TCP thread exits ...", pThreadObj->label);
taosTFree(pThreadObj);
return NULL; return NULL;
} }
......
...@@ -214,7 +214,7 @@ static void *taosRecvUdpData(void *param) { ...@@ -214,7 +214,7 @@ static void *taosRecvUdpData(void *param) {
tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
continue; continue;
} else { } else {
tDebug("UDP malloc mem: %p", tmsg); tTrace("UDP malloc mem: %p", tmsg);
} }
tmsg += tsRpcOverhead; // overhead for SRpcReqContext tmsg += tsRpcOverhead; // overhead for SRpcReqContext
......
...@@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) { ...@@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) {
continue; continue;
} }
} }
} }
if (pThread->stop) break;
} }
uDebug("%p TCP epoll thread exits", pThread); uDebug("%p TCP epoll thread exits", pThread);
...@@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) { ...@@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) {
} }
pthread_join(thread, NULL); pthread_join(thread, NULL);
taosClose(fd); if (fd >= 0) taosClose(fd);
} }
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.example.calcite</groupId>
<artifactId>calciteDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<!-- calcite -->
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.23.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.17.0</version>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- tdengine -->
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.taosdata.example.calcite;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.commons.dbcp2.BasicDataSource;
import java.sql.*;
import java.util.Properties;
public class CalciteDemo {
private static String url_taos = "jdbc:TAOS://192.168.236.135:6030/test";
private static String url_mysql = "jdbc:mysql://master:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8";
public static void main(String[] args) throws SqlParseException, ClassNotFoundException, SQLException {
Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("caseSensitive", "false");
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
//这里hdb是在tdengine中创建的数据库名
Schema schema = mysqlTest(rootSchema);
// Schema schema = tdengineTest(rootSchema);
//创建新的schema自动映射到原来的hdb数据库
rootSchema.add("test", schema);
Statement stmt = calciteConnection.createStatement();
//查询schema test中的表,表名是tdengine中的表
ResultSet rs = stmt.executeQuery("select * from test.t");
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) {
for (int i = 1; i <= metaData.getColumnCount(); i++) {
System.out.println(metaData.getColumnLabel(i) + " : " + rs.getString(i));
}
}
}
private static Schema tdengineTest(SchemaPlus rootSchema) throws ClassNotFoundException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(url_taos);
dataSource.setUsername("root");
dataSource.setPassword("taosdata");
return JdbcSchema.create(rootSchema, "test", dataSource, "hdb", null);
}
private static Schema mysqlTest(SchemaPlus rootSchema) throws ClassNotFoundException {
Class.forName("com.mysql.jdbc.Driver");
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(url_mysql);
dataSource.setUsername("root");
dataSource.setPassword("123456");
//Schema schema = JdbcSchema.create(rootSchema, "test", dataSource, "hdb", null);
return JdbcSchema.create(rootSchema, "test", dataSource, "test", null);
}
}
log4j.rootLogger=info,stdout
#console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern= [%d{yyyy-MM-dd HH:mm:ss a}]:%p %l%m%n
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import threading
import taos
import json
import time
import random
# query sql
query_sql = [
# first supertable
"select count(*) from test.meters where c1 > 50;",
"select count(*) from test.meters where c2 >= 50 and c2 < 100;",
"select count(*) from test.meters where c3 != 5;",
"select count(*) from test.meters where t3 > 2;",
"select count(*) from test.meters where ts <> '2020-05-13 10:00:00.002';",
"select count(*) from test.meters where t7 like 'fi%';",
"select count(*) from test.meters where t7 like '_econd';",
"select count(*) from test.meters interval(1n) order by ts desc;",
"select first(*) from test.meters;",
"select last(*) from test.meters;",
"select last_row(*) from test.meters;",
"select twa(c1) from test.t1 where ts > 1500000001000 and ts < 1500000101000" ,
"select avg(c1) from test.meters;",
"select bottom(c1, 2) from test.t1;",
"select diff(c1) from test.t1;",
"select leastsquares(c1, 1, 1) from test.t1 ;",
"select max(c1) from test.meters;",
"select min(c1) from test.meters;",
"select c1 + c2 * c3 + c1 / c5 + c4 + c2 from test.t1;",
"select percentile(c1, 50) from test.t1;",
"select spread(c1) from test.t1 ;",
"select stddev(c1) from test.t1;",
"select sum(c1) from test.meters;",
"select top(c1, 2) from test.meters;"
"select twa(c6) from test.t1 where ts > 1500000001000 and ts < 1500000101000" ,
"select avg(c6) from test.meters;",
"select bottom(c6, 2) from test.t1;",
"select diff(c6) from test.t1;",
"select leastsquares(c6, 1, 1) from test.t1 ;",
"select max(c6) from test.meters;",
"select min(c6) from test.meters;",
"select c6 + c2 * c3 + c6 / c5 + c4 + c2 from test.t1;",
"select percentile(c6, 50) from test.t1;",
"select spread(c6) from test.t1 ;",
"select stddev(c6) from test.t1;",
"select sum(c6) from test.meters;",
"select top(c6, 2) from test.meters;",
# second supertable
"select count(*) from test.meters1 where c1 > 50;",
"select count(*) from test.meters1 where c2 >= 50 and c2 < 100;",
"select count(*) from test.meters1 where c3 != 5;",
"select count(*) from test.meters1 where t3 > 2;",
"select count(*) from test.meters1 where ts <> '2020-05-13 10:00:00.002';",
"select count(*) from test.meters1 where t7 like 'fi%';",
"select count(*) from test.meters1 where t7 like '_econd';",
"select count(*) from test.meters1 interval(1n) order by ts desc;",
"select first(*) from test.meters1;",
"select last(*) from test.meters1;",
"select last_row(*) from test.meters1;",
"select twa(c1) from test.m1 where ts > 1500000001000 and ts < 1500000101000" ,
"select avg(c1) from test.meters1;",
"select bottom(c1, 2) from test.m1;",
"select diff(c1) from test.m1;",
"select leastsquares(c1, 1, 1) from test.m1 ;",
"select max(c1) from test.meters1;",
"select min(c1) from test.meters1;",
"select c1 + c2 * c3 + c1 / c5 + c3 + c2 from test.m1;",
"select percentile(c1, 50) from test.m1;",
"select spread(c1) from test.m1 ;",
"select stddev(c1) from test.m1;",
"select sum(c1) from test.meters1;",
"select top(c1, 2) from test.meters1;",
"select twa(c6) from test.m1 where ts > 1500000001000 and ts < 1500000101000" ,
"select avg(c6) from test.meters1;",
"select bottom(c6, 2) from test.m1;",
"select diff(c6) from test.m1;",
"select leastsquares(c6, 1, 1) from test.m1 ;",
"select max(c6) from test.meters1;",
"select min(c6) from test.meters1;",
"select c6 + c2 * c3 + c6 / c5 + c3 + c2 from test.m1;",
"select percentile(c6, 50) from test.m1;",
"select spread(c6) from test.m1 ;",
"select stddev(c6) from test.m1;",
"select sum(c6) from test.meters1;",
"select top(c6, 2) from test.meters1;"
]
class ConcurrentInquiry:
def initConnection(self):
self.numOfTherads = 50
self.ts=1500000001000
def query_thread(self,threadID):
host = "10.211.55.14"
user = "root"
password = "taosdata"
conn = taos.connect(
host,
user,
password,
)
cl = conn.cursor()
print("Thread %d: starting" % threadID)
while True:
ran_query_sql=query_sql
random.shuffle(ran_query_sql)
for i in ran_query_sql:
print("Thread %d : %s"% (threadID,i))
try:
cl.execute(i)
cl.fetchall
except Exception as e:
print(
"Failure thread%d, sql: %s,exception: %s" %
(threadID, str(i),str(e)))
print("Thread %d: finishing" % threadID)
def run(self):
threads = []
for i in range(50):
thread = threading.Thread(target=self.query_thread, args=(i,))
threads.append(thread)
thread.start()
q = ConcurrentInquiry()
q.initConnection()
q.run()
...@@ -119,7 +119,7 @@ endi ...@@ -119,7 +119,7 @@ endi
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql
print 17-> $system_content print 17-> $system_content
if $system_content != @{"status":"error","code":534,"desc":"Syntax errr in SQL"}@ then if $system_content != @{"status":"error","code":534,"desc":"Syntax error in SQL"}@ then
return -1 return -1
endi endi
......
...@@ -65,7 +65,7 @@ endi ...@@ -65,7 +65,7 @@ endi
print ============== step4 print ============== step4
sql drop dnode $hostname2 sql drop dnode $hostname2
sleep 16000 sleep 10000
sql show mnodes sql show mnodes
$dnode1Role = $data2_1 $dnode1Role = $data2_1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册