提交 59acafab 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/feature/query' into feature/query

......@@ -45,7 +45,7 @@ IF (TD_LINUX_64)
ADD_DEFINITIONS(-D_M_X64)
ADD_DEFINITIONS(-D_TD_LINUX_64)
MESSAGE(STATUS "linux64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DUSE_LIBICONV)
ENDIF ()
......@@ -53,7 +53,7 @@ IF (TD_LINUX_32)
ADD_DEFINITIONS(-D_TD_LINUX_32)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "linux32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -fsigned-char -munaligned-access -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_ARM_64)
......@@ -62,7 +62,7 @@ IF (TD_ARM_64)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_ARM_32)
......@@ -70,21 +70,21 @@ IF (TD_ARM_32)
ADD_DEFINITIONS(-D_TD_ARM_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -Wno-pointer-to-int-cast -Wno-int-to-pointer-cast -Wno-incompatible-pointer-types ")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -Wno-pointer-to-int-cast -Wno-int-to-pointer-cast -Wno-incompatible-pointer-types ")
ENDIF ()
IF (TD_MIPS_64)
ADD_DEFINITIONS(-D_TD_MIPS_64_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "mips64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_MIPS_32)
ADD_DEFINITIONS(-D_TD_MIPS_32_)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "mips32 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_APLHINE)
......@@ -105,8 +105,8 @@ IF (TD_LINUX)
MESSAGE(STATUS "set ningsi macro to true")
ENDIF ()
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0 -Wno-unused-variable -Wunused-but-set-variable")
SET(DEBUG_FLAGS "-O0 -g3 -DDEBUG")
SET(RELEASE_FLAGS "-O3 -Wno-error")
IF (${COVER} MATCHES "true")
MESSAGE(STATUS "Test coverage mode, add extra flags")
......@@ -125,9 +125,9 @@ IF (TD_DARWIN_64)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "darwin64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(DEBUG_FLAGS "-O0 -g3 -DDEBUG")
SET(RELEASE_FLAGS "-Og")
ENDIF ()
IF (TD_WINDOWS)
......@@ -140,7 +140,7 @@ IF (TD_WINDOWS)
IF (NOT TD_GODLL)
SET(COMMON_FLAGS "/nologo /WX /wd4018 /wd2220 /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-")
SET(DEBUG_FLAGS "/Zi /W3 /GL")
SET(RELEASE_FLAGS "/W0 /GL")
SET(RELEASE_FLAGS "/W0 /O3 /GL")
ENDIF ()
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
......
......@@ -41,8 +41,10 @@ SET(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} ${COMMON_FLAGS} ${RELEASE_FL
# SET(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ${COMMON_CXX_FLAGS} ${RELEASE_FLAGS}")
IF (${CMAKE_BUILD_TYPE} MATCHES "Debug")
SET(CMAKE_BUILD_TYPE "Debug")
MESSAGE(STATUS "Build Debug Version")
ELSEIF (${CMAKE_BUILD_TYPE} MATCHES "Release")
SET(CMAKE_BUILD_TYPE "Release")
MESSAGE(STATUS "Build Release Version")
ELSE ()
IF (TD_WINDOWS)
......
......@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.0.9.0")
SET(TD_VER_NUMBER "2.0.10.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -59,5 +59,3 @@ TDengine的模块之一是时序数据库。但除此之外,为减少研发的
|要求运维学习成本可控| | | √ |同上。|
|要求市场有大量人才储备| √ | | |TDengine作为新一代产品,目前人才市场里面有经验的人员还有限。但是学习成本低,我们作为厂家也提供运维的培训和辅助服务。|
## TDengine 性能指标介绍和验证方法
......@@ -16,21 +16,23 @@ CPU支持X64/ARM64/MIPS64/Alpha64,后续会支持ARM32、RISC-V等CPU架构。
目前TDengine服务器可以运行在以下平台上:
| | **CentOS** **6/7/8** | **Ubuntu** **16/18/20** | **Other Linux** | **Win64/32** | **macOS** | **统信****UOS** | **银河****/****中标麒麟** | **凝思** **V60/V80** |
| -------------- | --------------------- | ------------------------ | --------------- | ------------ | --------- | --------------- | ------------------------- | --------------------- |
| X64 | ● | ● | | ○/○ | ○ | ○ | ● | ● |
| 树莓派ARM32 | | ● | ● | | | | | |
| 龙芯MIPS64 | | | ● | | | | | |
| 鲲鹏 ARM64 | | ○ | ○ | | | | ● | |
| 申威 Alpha64 | | | ○ | | | ● | | |
| 飞腾ARM64 | | | ○优麒麟 | | | | | |
| 海光X64 | ● | ● | ● | | | ○ | ● | ● |
| 瑞芯微ARM64/32 | | | ○ | | | | | |
| 全志ARM64/32 | | | ○ | | | | | |
| 炬力ARM64/32 | | | ○ | | | | | |
| TI ARM32 | | | ○ | | | | | |
| | **CentOS** **6/7/8** | **Ubuntu** **16/18/20** | **Other Linux** | **统信****UOS** | **银河****/****中标麒麟** | **凝思** **V60/V80** |
| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- |
| X64 | ● | ● | | ○ | ● | ● |
| 树莓派ARM32 | | ● | ● | | | |
| 龙芯MIPS64 | | | ● | | | |
| 鲲鹏 ARM64 | | ○ | ○ | | ● | |
| 申威 Alpha64 | | | ○ | ● | | |
| 飞腾ARM64 | | ○优麒麟 | | | | |
| 海光X64 | ● | ● | ● | ○ | ● | ● |
| 瑞芯微ARM64/32 | | | ○ | | | |
| 全志ARM64/32 | | | ○ | | | |
| 炬力ARM64/32 | | | ○ | | | |
| TI ARM32 | | | ○ | | | |
其中 ● 表示经过官方测试验证, ○ 表示非官方测试验证。
### 通过源码安装
......@@ -142,7 +144,7 @@ taos -h 192.168.0.1 -s "use db; show tables;"
TDengine终端可以通过`source`命令来运行SQL命令脚本.
```
```mysql
taos> source <filename>;
```
......
......@@ -8,14 +8,14 @@ TDengine采用关系型数据模型,需要建库、建表。因此对于一个
不同类型的数据采集点往往具有不同的数据特征,包括数据采集频率的高低,数据保留时间的长短,副本的数目,数据块的大小,是否允许更新数据等等。为让各种场景下TDengine都能最大效率的工作,TDengine建议将不同数据特征的表创建在不同的库里,因为每个库可以配置不同的存储策略。创建一个库时,除SQL标准的选项外,应用还可以指定保留时长、副本数、内存块个数、时间精度、文件块里最大最小记录条数、是否压缩、一个数据文件覆盖的天数等多种参数。比如:
```cmd
```mysql
CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 4 UPDATE 1;
```
上述语句将创建一个名为power的库,这个库的数据将保留365天(超过365天将被自动删除),每10天一个数据文件,内存块数为4,允许更新数据。详细的语法及参数请见<a href="https://www.taosdata.com/cn/documentation20/taos-sql/">TAOS SQL</a>
创建库之后,需要使用SQL命令USE将当前库切换过来,例如:
```cmd
```mysql
USE power;
```
......@@ -28,7 +28,7 @@ USE power;
## 创建超级表
一个物联网系统,往往存在多种类型的设备,比如对于电网,存在智能电表、变压器、母线、开关等等。为便于多表之间的聚合,使用TDengine, 需要对每个类型的数据采集点创建一超级表。以表一中的智能电表为例,可以使用如下的SQL命令创建超级表:
```cmd
```mysql
CREATE TABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupdId int);
```
与创建普通表一样,创建表时,需要提供表名(示例中为meters),表结构Schema,即数据列的定义。第一列必须为时间戳(示例中为ts),其他列为采集的物理量(示例中为current, voltage, phase),数据类型可以为整型、浮点型、字符串等。除此之外,还需要提供标签的schema (示例中为location, groupId),标签的数据类型可以为整型、浮点型、字符串等。采集点的静态属性往往可以作为标签,比如采集点的地理位置、设备型号、设备组ID、管理员ID等等。标签的schema可以事后增加、删除、修改。具体定义以及细节请见 <a href="https://www.taosdata.com/cn/documentation20/taos-sql/">TAOS SQL </a>一节。
......
......@@ -60,7 +60,7 @@ create table avg_vol as select avg(voltage) from meters interval(1m) sliding(30s
会自动创建一个名为 `avg_vol` 的新表,然后每隔30秒,TDengine会增量执行 `as` 后面的 SQL 语句,
并将查询结果写入这个表中,用户程序后续只要从 `avg_vol` 中查询数据即可。 例如:
```shell
```mysql
taos> select * from avg_vol;
ts | avg_voltage_ |
===================================================
......@@ -72,14 +72,13 @@ taos> select * from avg_vol;
需要注意,查询时间窗口的最小值是10毫秒,没有时间窗口范围的上限。
此外,TDengine还支持用户指定连续查询的起止时间。
如果不输入开始时间,连续查询将从第一条原始数据所在的时间窗口开始;
如果没有输入结束时间,连续查询将永久运行;
如果用户指定了结束时间,连续查询在系统时间达到指定的时间以后停止运行。
比如使用下面的SQL创建的连续查询将运行一小时,之后会自动停止。
```sql
```mysql
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s);
```
......
......@@ -6,16 +6,18 @@ TDengine提供了丰富的应用程序开发接口,其中包括C/C++、C# 、J
目前TDengine的连接器可支持的平台广泛,目前包括:X64/X86/ARM64/ARM32/MIPS/Alpha等硬件平台,以及Linux/Win64/Win32等开发环境。对照矩阵如下:
| | **CPU** | **X64 64bit** | **X86 32bit** | **ARM64** | **ARM32** | **MIPS ** **龙芯** | **Alpha ** **申威** | **X64 ** **海光** | | |
| ---------------------------- | --------- | --------------- | --------------- | --------- | --------- | ------------------- | -------------------- | ------------------ | --------- | --------- |
| | **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** | **Linux** | **Linux** |
| **连** **接** **器** | **C/C++** | ● | ● | ● | ○ | ● | ● | ● | ● | ● |
| **JDBC** | ● | ● | ● | ○ | ● | ● | ● | ● | ● | |
| **Python** | ● | ● | ● | ○ | ● | ● | ● | -- | ● | |
| **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- | -- | |
| **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- | -- | |
| **C#** | ○ | ● | ● | ○ | ○ | ○ | ○ | -- | -- | |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● | ● | |
| **CPU** | **X64 64bit** | **X64 64bit** | **X64 64bit** | **X86 32bit** | **ARM64** | **ARM32** | **MIPS 龙芯** | **Alpha 申威** | **X64 海光** |
| ----------- | --------------- | --------------- | --------------- | --------------- | --------- | --------- | --------------- | ---------------- | -------------- |
| **OS** | **Linux** | **Win64** | **Win32** | **Win32** | **Linux** | **Linux** | **Linux** | **Linux** | **Linux** |
| **C/C++** | ● | ● | ● | ○ | ● | ● | ● | ● | ● |
| **JDBC** | ● | ● | ● | ○ | ● | ● | ● | ● | ● |
| **Python** | ● | ● | ● | ○ | ● | ● | ● | -- | ● |
| **Go** | ● | ● | ● | ○ | ● | ● | ○ | -- | -- |
| **NodeJs** | ● | ● | ○ | ○ | ● | ● | ○ | -- | -- |
| **C#** | ○ | ● | ● | ○ | ○ | ○ | ○ | -- | -- |
| **RESTful** | ● | ● | ● | ● | ● | ● | ● | ● | ● |
其中 ● 表示经过官方测试验证, ○ 表示非官方测试验证。
注意:所有执行 SQL 语句的 API,例如 C/C++ Connector 中的 `tao_query``taos_query_a``taos_subscribe` 等,以及其它语言中与它们对应的API,每次都只能执行一条 SQL 语句,如果实际参数中包含了多条语句,它们的行为是未定义的。
......@@ -300,7 +302,7 @@ TDengine提供时间驱动的实时流式计算API。可以每隔一指定的时
### 安装准备
* 已安装TDengine, 如果客户端在Windows上,需要安装Windows 版本的TDengine客户端 [(Windows TDengine 客户端安装)][4]
* 已安装python 2.7 or >= 3.4
* 已安装pip
* 已安装pip 或 pip3
### Python客户端安装
......@@ -312,7 +314,7 @@ TDengine提供时间驱动的实时流式计算API。可以每隔一指定的时
`pip install src/connector/python/linux/python3/`
`pip3 install src/connector/python/linux/python3/`
#### Windows
在已安装Windows TDengine 客户端的情况下, 将文件"C:\TDengine\driver\taos.dll" 拷贝到 "C:\windows\system32" 目录下, 然后进入Windwos <em>cmd</em> 命令行界面
......@@ -472,13 +474,13 @@ HTTP请求的BODY里就是一个完整的SQL语句,SQL语句中的数据表应
使用curl通过自定义身份认证方式来发起一个HTTP Request,语法如下:
```
```bash
curl -H 'Authorization: Basic <TOKEN>' -d '<SQL>' <ip>:<PORT>/rest/sql
```
或者
```
```bash
curl -u username:password -d '<SQL>' <ip>:<PORT>/rest/sql
```
......@@ -488,7 +490,7 @@ curl -u username:password -d '<SQL>' <ip>:<PORT>/rest/sql
返回值为JSON格式,如下:
```
```json
{
"status": "succ",
"head": ["Time Stamp","current", ],
......@@ -511,7 +513,7 @@ curl -u username:password -d '<SQL>' <ip>:<PORT>/rest/sql
HTTP请求中需要带有授权码`<TOKEN>`,用于身份识别。授权码通常由管理员提供,可简单的通过发送`HTTP GET`请求来获取授权码,操作如下:
```
```bash
curl http://<ip>:6041/rest/login/<username>/<password>
```
......@@ -525,13 +527,13 @@ curl http://<ip>:6041/rest/login/<username>/<password>
获取授权码示例:
```
```bash
curl http://192.168.0.1:6041/rest/login/root/taosdata
```
返回值:
```
```json
{
"status": "succ",
"code": 0,
......@@ -543,12 +545,12 @@ curl http://192.168.0.1:6041/rest/login/root/taosdata
- 在demo库里查询表d1001的所有记录:
```
```bash
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.d1001' 192.168.0.1:6041/rest/sql
```
返回值:
```
```json
{
"status": "succ",
"head": ["Time Stamp","current","voltage","phase"],
......@@ -562,12 +564,12 @@ curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.d1001
- 创建库demo:
```
```bash
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create database demo' 192.168.0.1:6041/rest/sql
```
返回值:
```
```json
{
"status": "succ",
"head": ["affected_rows"],
......@@ -582,13 +584,13 @@ curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create database demo' 19
HTTP请求URL采用`sqlt`时,返回结果集的时间戳将采用Unix时间戳格式表示,例如
```
```bash
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.d1001' 192.168.0.1:6041/rest/sqlt
```
返回值:
```
```json
{
"status": "succ",
"head": ["column1","column2","column3"],
......@@ -603,13 +605,13 @@ curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.d1001
#### 结果集采用UTC时间字符串
HTTP请求URL采用`sqlutc`时,返回结果集的时间戳将采用UTC时间字符串表示,例如
```
```bash
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6041/rest/sqlutc
```
返回值:
```
```json
{
"status": "succ",
"head": ["column1","column2","column3"],
......@@ -724,7 +726,7 @@ TDengine 同时也提供了node.js 的连接器。用户可以通过[npm](https:
首先,通过[npm](https://www.npmjs.com/)安装node.js 连接器.
```cmd
```bash
npm install td2.0-connector
```
我们建议用户使用npm 安装node.js连接器。如果您没有安装npm, 可以将*src/connector/nodejs/*拷贝到您的nodejs 项目目录下
......
......@@ -37,7 +37,7 @@ INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6,
- 对应的TDengine版本。因为用到了TDengine的客户端动态链接库,因此需要安装好和服务端相同版本的TDengine程序;比如服务端版本是TDengine 2.0.0, 则在bailongma所在的linux服务器(可以与TDengine在同一台服务器,或者不同服务器)
Bailongma项目中有一个文件夹blm_prometheus,存放了prometheus的写入API程序。编译过程如下:
```
```bash
cd blm_prometheus
go build
```
......@@ -79,7 +79,7 @@ blm_prometheus对prometheus提供服务的端口号。
### 启动示例
通过以下命令启动一个blm_prometheus的API服务
```
```bash
./blm_prometheus -port 8088
```
假设blm_prometheus所在服务器的IP地址为"10.1.2.3",则在prometheus的配置文件中<remote_write>部分增加url为
......@@ -107,7 +107,7 @@ prometheus产生的数据格式如下:
}
```
其中,apiserver_request_latencies_bucket为prometheus采集的时序数据的名称,后面{}中的为该时序数据的标签。blm_prometheus会以时序数据的名称在TDengine中自动创建一个超级表,并将{}中的标签转换成TDengine的tag值,Timestamp作为时间戳,value作为该时序数据的值。因此在TDengine的客户端中,可以通过以下指令查到这个数据是否成功写入。
```
```mysql
use prometheus;
select * from apiserver_request_latencies_bucket;
```
......@@ -124,7 +124,7 @@ select * from apiserver_request_latencies_bucket;
Bailongma项目中有一个文件夹blm_telegraf,存放了Telegraf的写入API程序。编译过程如下:
```
```bash
cd blm_telegraf
go build
```
......@@ -175,7 +175,7 @@ blm_telegraf对telegraf提供服务的端口号。
### 启动示例
通过以下命令启动一个blm_telegraf的API服务
```
```bash
./blm_telegraf -host 127.0.0.1 -port 8089
```
......@@ -213,7 +213,7 @@ telegraf产生的数据格式如下:
其中,name字段为telegraf采集的时序数据的名称,tags字段为该时序数据的标签。blm_telegraf会以时序数据的名称在TDengine中自动创建一个超级表,并将tags字段中的标签转换成TDengine的tag值,Timestamp作为时间戳,fields字段中的值作为该时序数据的值。因此在TDengine的客户端中,可以通过以下指令查到这个数据是否成功写入。
```
```mysql
use telegraf;
select * from cpu;
```
......
name: tdengine
base: core18
version: '2.0.9.0'
version: '2.0.10.0'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd
- usr/bin/taos
- usr/bin/taosdemo
- usr/lib/libtaos.so.2.0.9.0
- usr/lib/libtaos.so.2.0.10.0
- usr/lib/libtaos.so.1
- usr/lib/libtaos.so
......
......@@ -569,7 +569,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
char fullName[TSDB_TABLE_FNAME_LEN] = {0};
char fullName[TSDB_TABLE_FNAME_LEN * 2] = {0};
extractDBName(pTableMetaInfo->name, fullName);
extractTableName(pMeta->sTableId, param->sTableName);
snprintf(fullName + strlen(fullName), TSDB_TABLE_FNAME_LEN - strlen(fullName), ".%s", param->sTableName);
......
......@@ -243,7 +243,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen);
char *pMsg = rpcMallocCont(sizeof(SMsgVersion) + pCmd->payloadLen);
if (NULL == pMsg) {
tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -254,12 +254,13 @@ int tscSendMsgToServer(SSqlObj *pSql) {
tscDumpMgmtEpSet(pSql);
}
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
tstrncpy(pMsg, version, sizeof(SMsgVersion));
memcpy(pMsg + sizeof(SMsgVersion), pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
.pCont = pMsg,
.contLen = pSql->cmd.payloadLen,
.contLen = pSql->cmd.payloadLen + sizeof(SMsgVersion),
.ahandle = (void*)pSql->self,
.handle = NULL,
.code = 0
......
......@@ -407,7 +407,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
pCmd->autoCreated = 0;
for(int32_t i = 0; i < pCmd->numOfTables; ++i) {
if (pCmd->pTableMetaList[i] != NULL) {
if (pCmd->pTableMetaList && pCmd->pTableMetaList[i]) {
taosCacheRelease(tscMetaCache, (void**)&(pCmd->pTableMetaList[i]), false);
}
}
......
......@@ -705,7 +705,7 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
return 0;
} else {
double value;
double value = -1;
int32_t ret;
ret = convertToDouble(pVariant->pz, pVariant->nLen, &value);
if ((errno == ERANGE && (float)value == -1) || (ret != 0)) {
......
......@@ -97,7 +97,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
pthread_mutex_init(&pContext->mutex, NULL);
cInfo("vgId:%d, CQ is opened", pContext->vgId);
cDebug("vgId:%d, CQ is opened", pContext->vgId);
return pContext;
}
......@@ -131,7 +131,7 @@ void cqClose(void *handle) {
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
cInfo("vgId:%d, CQ is closed", pContext->vgId);
cDebug("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
}
......@@ -142,7 +142,7 @@ void cqStart(void *handle) {
SCqContext *pContext = handle;
if (pContext->dbConn || pContext->master) return;
cInfo("vgId:%d, start all CQs", pContext->vgId);
cDebug("vgId:%d, start all CQs", pContext->vgId);
pthread_mutex_lock(&pContext->mutex);
pContext->master = 1;
......@@ -298,7 +298,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if (pObj->pStream) {
tscSetStreamDestTable(pObj->pStream, pObj->dstTable);
pContext->num++;
cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
}
......
......@@ -124,8 +124,6 @@ void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) {
SMnodeMsg *pRead = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
}
rpcFreeCont(pMsg->pCont);
}
static void dnodeFreeMReadMsg(SMnodeMsg *pRead) {
......
......@@ -125,8 +125,6 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
rpcFreeCont(pMsg->pCont);
}
static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) {
......
......@@ -147,6 +147,7 @@ int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING);
dnodeReportStep("TDengine", "initialized successfully", 1);
dInfo("TDengine is initialized successfully");
return 0;
......
......@@ -127,7 +127,20 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} else {}
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
SMsgVersion *pMsgVersion = pMsg->pCont;
if (taosCheckVersion(pMsgVersion->clientVersion, version, 3) != TSDB_CODE_SUCCESS) {
rpcMsg.code = TSDB_CODE_TSC_INVALID_VERSION;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return; // todo change the error code
}
pMsg->pCont += sizeof(*pMsgVersion);
pMsg->contLen -= sizeof(*pMsgVersion);
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
//pMsg->contLen += sizeof(*pMsgVersion);
rpcFreeCont(pMsg->pCont - sizeof(*pMsgVersion));
} else {
dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
......@@ -231,4 +244,4 @@ SStatisInfo dnodeGetStatisInfo() {
}
return info;
}
\ No newline at end of file
}
......@@ -57,12 +57,13 @@ int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize) {
int32_t code = (*pStep->initFp)();
if (code != 0) {
dDebug("step:%s will init", pStep->name);
dDebug("step:%s will cleanup", pStep->name);
taosStepCleanupImp(pSteps, step);
return code;
}
dInfo("step:%s is initialized", pStep->name);
dnodeReportStep(pStep->name, "Initialization complete", step + 1 >= stepSize);
dnodeReportStep(pStep->name, "Initialization complete", 0);
}
return 0;
......
......@@ -77,8 +77,6 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
rpcSendResponse(&rpcRsp);
}
rpcFreeCont(pMsg->pCont);
}
void *dnodeAllocVQueryQueue(void *pVnode) {
......
......@@ -102,7 +102,6 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
}
vnodeRelease(pVnode);
rpcFreeCont(pRpcMsg->pCont);
}
void *dnodeAllocVWriteQueue(void *pVnode) {
......
......@@ -33,6 +33,8 @@ typedef struct {
extern void * tsDnodeTmr;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime = 0;
static int32_t tsOpenVnodes = 0;
static int32_t tsTotalVnodes = 0;
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
......@@ -84,21 +86,27 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
static void *dnodeOpenVnode(void *param) {
SOpenVnodeThread *pThread = param;
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", vgId, tsOpenVnodes, tsTotalVnodes);
dnodeReportStep("open-vnodes", stepDesc, 0);
if (vnodeOpen(vgId) < 0) {
dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
pThread->failed++;
} else {
dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
dDebug("vgId:%d, is opened by thread:%d", vgId, pThread->threadIndex);
pThread->opened++;
}
atomic_add_fetch_32(&tsOpenVnodes, 1);
}
dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
......@@ -107,6 +115,7 @@ int32_t dnodeInitVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
tsTotalVnodes = numOfVnodes;
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
......@@ -127,7 +136,7 @@ int32_t dnodeInitVnodes() {
pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
}
dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes);
dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
......@@ -156,7 +165,7 @@ int32_t dnodeInitVnodes() {
}
free(threads);
dInfo("there are total vnodes:%d, openned:%d", numOfVnodes, openVnodes);
dInfo("there are total vnodes:%d, opened:%d", numOfVnodes, openVnodes);
if (failedVnodes != 0) {
dError("there are total vnodes:%d, failed:%d", numOfVnodes, failedVnodes);
......
......@@ -266,6 +266,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos")
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
......
......@@ -198,6 +198,11 @@ typedef struct {
int32_t numOfVnodes;
} SMsgDesc;
typedef struct SMsgVersion {
char clientVersion[TSDB_VERSION_LEN];
uint32_t crc;
} SMsgVersion;
typedef struct SMsgHead {
int32_t contLen;
int32_t vgId;
......
......@@ -121,7 +121,6 @@ extern char *syncRole[];
//global configurable parameters
extern int32_t tsMaxSyncNum;
extern int32_t tsSyncTcpThreads;
extern int32_t tsMaxWatchFiles;
extern int32_t tsSyncTimer;
extern int32_t tsMaxFwdInfo;
extern int32_t sDebugFlag;
......
......@@ -104,7 +104,7 @@ static int32_t mnodeDnodeActionInsert(SSdbRow *pRow) {
dnodeUpdateEp(pDnode->dnodeId, pDnode->dnodeEp, pDnode->dnodeFqdn, &pDnode->dnodePort);
mnodeUpdateDnodeEps();
mInfo("dnode:%d, fqdn:%s ep:%s port:%d, do insert action", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp, pDnode->dnodePort);
mInfo("dnode:%d, fqdn:%s ep:%s port:%d is created", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp, pDnode->dnodePort);
return TSDB_CODE_SUCCESS;
}
......
......@@ -91,6 +91,7 @@ int32_t mnodeStartSystem() {
return -1;
}
dnodeReportStep("mnode-grant", "start to set grant infomation", 0);
grantReset(TSDB_GRANT_ALL, 0);
tsMgmtIsRunning = true;
......
......@@ -72,7 +72,7 @@ static int32_t mnodeMnodeActionInsert(SSdbRow *pRow) {
pDnode->isMgmt = true;
mnodeDecDnodeRef(pDnode);
mInfo("mnode:%d, fqdn:%s ep:%s port:%u, do insert action", pMnode->mnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp,
mInfo("mnode:%d, fqdn:%s ep:%s port:%u is created", pMnode->mnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp,
pDnode->dnodePort);
return TSDB_CODE_SUCCESS;
}
......@@ -202,13 +202,13 @@ void mnodeCancelGetNextMnode(void *pIter) {
void mnodeUpdateMnodeEpSet(SMInfos *pMinfos) {
bool set = false;
SMInfos mInfos = {0};
mInfo("vgId:1, update mnodes epSet, numOfMnodes:%d pMinfos:%p", mnodeGetMnodesNum(), pMinfos);
if (pMinfos != NULL) {
mInfo("vgId:1, update mnodes epSet, numOfMinfos:%d", pMinfos->mnodeNum);
set = true;
mInfos = *pMinfos;
}
else {
} else {
mInfo("vgId:1, update mnodes epSet, numOfMnodes:%d", mnodeGetMnodesNum());
int32_t index = 0;
void * pIter = NULL;
while (1) {
......
......@@ -183,18 +183,23 @@ static int32_t sdbInitWal() {
return -1;
}
sdbInfo("vgId:1, open wal for restore");
sdbInfo("vgId:1, open sdb wal for restore");
int32_t code = walRestore(tsSdbMgmt.wal, NULL, sdbProcessWrite);
if (code != TSDB_CODE_SUCCESS) {
sdbError("vgId:1, failed to open wal for restore since %s", tstrerror(code));
return -1;
}
sdbInfo("vgId:1, sdb wal load success");
return 0;
}
static void sdbRestoreTables() {
int32_t totalRows = 0;
int32_t numOfTables = 0;
sdbInfo("vgId:1, sdb start to check for integrity");
for (int32_t tableId = 0; tableId < SDB_TABLE_MAX; ++tableId) {
SSdbTable *pTable = sdbGetTableFromId(tableId);
if (pTable == NULL) continue;
......@@ -204,7 +209,7 @@ static void sdbRestoreTables() {
totalRows += pTable->numOfRows;
numOfTables++;
sdbDebug("vgId:1, sdb:%s is restored, rows:%" PRId64, pTable->name, pTable->numOfRows);
sdbInfo("vgId:1, sdb:%s is checked, rows:%" PRId64, pTable->name, pTable->numOfRows);
}
sdbInfo("vgId:1, sdb is restored, mver:%" PRIu64 " rows:%d tables:%d", tsSdbMgmt.version, totalRows, numOfTables);
......@@ -628,6 +633,12 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
SSdbTable *pTable = sdbGetTableFromId(tableId);
assert(pTable != NULL);
if (!mnodeIsRunning() && tsSdbMgmt.version % 100000 == 0) {
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "%" PRIu64 " rows have been restored", tsSdbMgmt.version);
dnodeReportStep("mnode-sdb", stepDesc, 0);
}
if (qtype == TAOS_QTYPE_QUERY) return sdbPerformDeleteAction(pHead, pTable);
pthread_mutex_lock(&tsSdbMgmt.mutex);
......
......@@ -49,12 +49,14 @@
#define CREATE_CTABLE_RETRY_TIMES 10
#define CREATE_CTABLE_RETRY_SEC 14
int64_t tsCTableRid = -1;
static void * tsChildTableSdb;
int64_t tsSTableRid = -1;
static void * tsSuperTableSdb;
static int32_t tsChildTableUpdateSize;
static int32_t tsSuperTableUpdateSize;
int64_t tsCTableRid = -1;
static void * tsChildTableSdb;
int64_t tsSTableRid = -1;
static void * tsSuperTableSdb;
static SHashObj *tsSTableUidHash;
static int32_t tsChildTableUpdateSize;
static int32_t tsSuperTableUpdateSize;
static void * mnodeGetChildTable(char *tableId);
static void * mnodeGetSuperTable(char *tableId);
static void * mnodeGetSuperTableByUid(uint64_t uid);
......@@ -289,6 +291,7 @@ static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) {
}
static int32_t mnodeChildTableActionRestored() {
#if 0
void *pIter = NULL;
SCTableObj *pTable = NULL;
......@@ -345,6 +348,7 @@ static int32_t mnodeChildTableActionRestored() {
}
mnodeCancelGetNextChildTable(pIter);
#endif
return 0;
}
......@@ -447,6 +451,7 @@ static int32_t mnodeSuperTableActionInsert(SSdbRow *pRow) {
}
mnodeDecDbRef(pDb);
taosHashPut(tsSTableUidHash, &pStable->uid, sizeof(int64_t), &pStable, sizeof(int64_t));
return TSDB_CODE_SUCCESS;
}
......@@ -459,6 +464,7 @@ static int32_t mnodeSuperTableActionDelete(SSdbRow *pRow) {
}
mnodeDecDbRef(pDb);
taosHashRemove(tsSTableUidHash, &pStable->uid, sizeof(int64_t));
return TSDB_CODE_SUCCESS;
}
......@@ -570,6 +576,7 @@ static int32_t mnodeInitSuperTables() {
.fpRestored = mnodeSuperTableActionRestored
};
tsSTableUidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
tsSTableRid = sdbOpenTable(&desc);
tsSuperTableSdb = sdbGetTableByRid(tsSTableRid);
if (tsSuperTableSdb == NULL) {
......@@ -584,6 +591,9 @@ static int32_t mnodeInitSuperTables() {
static void mnodeCleanupSuperTables() {
sdbCloseTable(tsSTableRid);
tsSuperTableSdb = NULL;
taosHashCleanup(tsSTableUidHash);
tsSTableUidHash = NULL;
}
int32_t mnodeInitTables() {
......@@ -633,20 +643,12 @@ static void *mnodeGetSuperTable(char *tableId) {
}
static void *mnodeGetSuperTableByUid(uint64_t uid) {
SSTableObj *pStable = NULL;
void *pIter = NULL;
SSTableObj **ppStable = taosHashGet(tsSTableUidHash, &uid, sizeof(int64_t));
if (ppStable == NULL || *ppStable == NULL) return NULL;
while (1) {
pIter = mnodeGetNextSuperTable(pIter, &pStable);
if (pStable == NULL) break;
if (pStable->uid == uid) {
mnodeCancelGetNextSuperTable(pIter);
return pStable;
}
mnodeDecTableRef(pStable);
}
return NULL;
SSTableObj *pStable = *ppStable;
mnodeIncTableRef(pStable);
return pStable;
}
void *mnodeGetTable(char *tableId) {
......
......@@ -313,7 +313,7 @@ void *rpcOpen(const SRpcInit *pInit) {
return NULL;
}
tDebug("%s rpc is openned, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
tDebug("%s rpc is opened, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
return pRpc;
}
......
......@@ -32,8 +32,7 @@
// global configurable
int32_t tsMaxSyncNum = 2;
int32_t tsSyncTcpThreads = 2;
int32_t tsMaxWatchFiles = 500;
int32_t tsMaxFwdInfo = 200;
int32_t tsMaxFwdInfo = 512;
int32_t tsSyncTimer = 1;
// module global, not configurable
......@@ -60,7 +59,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode);
static void syncMonitorFwdInfos(void *param, void *tmrId);
static void syncMonitorNodeRole(void *param, void *tmrId);
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void syncRestartPeer(SSyncPeer *pPeer);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp);
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
......@@ -892,15 +891,24 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
bool found = false;
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
// find the forwardInfo from first
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo;
if (pFwdRsp->version == pFwdInfo->version) break;
if (pFwdRsp->version == pFwdInfo->version) {
found = true;
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
syncRemoveConfirmedFwdInfo(pNode);
break;
}
}
}
if (!found) {
sTrace("%s, forward-rsp not found first:%d fwds:%d, code:%x hver:%" PRIu64, pPeer->id, pSyncFwds->first,
pSyncFwds->fwds, pFwdRsp->code, pFwdRsp->version);
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
syncRemoveConfirmedFwdInfo(pNode);
}
}
......@@ -1180,13 +1188,15 @@ static void syncProcessBrokenLink(void *param) {
taosReleaseRef(tsSyncRefId, pNode->rid);
}
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds >= tsMaxFwdInfo) {
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
pSyncFwds->fwds--;
// pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
// pSyncFwds->fwds--;
sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds);
return TSDB_CODE_SYN_TOO_MANY_FWDINFO;
}
if (pSyncFwds->fwds > 0) {
......@@ -1201,6 +1211,8 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
pSyncFwds->fwds++;
sTrace("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
return 0;
}
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
......@@ -1214,8 +1226,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
pSyncFwds->fwds--;
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
// sDebug("vgId:%d, fwd info is removed, hver:%d, fwds:%d",
// pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
memset(pFwdInfo, 0, sizeof(SFwdInfo));
}
}
......@@ -1341,8 +1352,8 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
if (pNode->quorum > 1 && code == 0) {
syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
code = 1;
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
if (code >= 0) code = 1;
}
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
......
......@@ -61,7 +61,7 @@ taos_queue taosOpenQueue() {
pthread_mutex_init(&queue->mutex, NULL);
uTrace("queue:%p is openned", queue);
uTrace("queue:%p is opened", queue);
return queue;
}
......@@ -230,7 +230,7 @@ taos_qset taosOpenQset() {
pthread_mutex_init(&qset->mutex, NULL);
tsem_init(&qset->sem, 0, 0);
uTrace("qset:%p is openned", qset);
uTrace("qset:%p is opened", qset);
return qset;
}
......
worker_processes 1;
user root;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
lua_package_path '$prefix/lua/?.lua;$prefix/rest/?.lua;/blah/?.lua;;';
lua_package_cpath "$prefix/so/?.so;;";
lua_code_cache off;
server {
listen 7000;
server_name restapi;
charset utf-8;
lua_need_request_body on;
location ~ ^/api/([-_a-zA-Z0-9/]+) {
content_by_lua_file rest/$1.lua;
}
}
}
# Ignore everything in this directory
*
# Except this file
!.gitignore
local driver = require "luaconnector51"
local cjson = require "cjson"
ngx.say("start time:"..os.time())
local config = {
host = "127.0.0.1",
port = 6030,
database = "",
user = "root",
password = "taosdata",
max_packet_size = 1024 * 1024
}
local conn
local res = driver.connect(config)
if res.code ~=0 then
ngx.say("connect--- failed: "..res.error)
return
else
conn = res.conn
ngx.say("connect--- pass.")
end
local res = driver.query(conn,"drop database if exists nginx")
if res.code ~=0 then
ngx.say("drop db--- failed: "..res.error)
else
ngx.say("drop db--- pass.")
end
res = driver.query(conn,"create database nginx")
if res.code ~=0 then
ngx.say("create db--- failed: "..res.error)
else
ngx.say("create db--- pass.")
end
res = driver.query(conn,"use nginx")
if res.code ~=0 then
ngx.say("select db--- failed: "..res.error)
else
ngx.say("select db--- pass.")
end
res = driver.query(conn,"create table m1 (ts timestamp, speed int,owner binary(20))")
if res.code ~=0 then
ngx.say("create table---failed: "..res.error)
else
ngx.say("create table--- pass.")
end
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
if res.code ~=0 then
ngx.say("insert records failed: "..res.error)
return
else
if(res.affected == 3) then
ngx.say("insert records--- pass")
else
ngx.say("insert records---failed: expect 3 affected records, actually affected "..res.affected)
end
end
res = driver.query(conn,"select * from m1")
if res.code ~=0 then
ngx.say("select failed: "..res.error)
return
else
ngx.say(cjson.encode(res))
if (#(res.item) == 3) then
ngx.say("select--- pass")
else
ngx.say("select--- failed: expect 3 affected records, actually received "..#(res.item))
end
end
driver.close(conn)
ngx.say("end time:"..os.time())
--ngx.log(ngx.ERR,"in test file.")
......@@ -2,13 +2,13 @@
It's a Lua implementation for [TDengine](https://github.com/taosdata/TDengine), an open-sourced big data platform designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring. You may need to install Lua5.3 .
## Dependencies
## Lua Dependencies
- Lua:
```
https://www.lua.org/
```
## Run with Sample
## Run with Lua Sample
Build driver lib:
```
......@@ -18,3 +18,26 @@ Run lua sample:
```
lua test.lua
```
## OpenResty Dependencies
- OpenResty:
```
http://openresty.org
```
## Run with OpenResty Sample
**This section demonstrates how to get binary file for connector. To be convenient for trial, an connector has been put into OpenResty work directory.
Because of difference on C API between Lua5.3 and Lua5.1, the files needed by connector for OpenResty are stored in local source directory and configured in script build.sh.**
Build driver lib:
```
cd lua51
./build.sh
```
Run OpenResty sample:
```
cd ..
cd OpenResty
sudo openresty -p .
curl http://127.0.0.1:7000/api/test
```
gcc lua_connector51.c -fPIC -shared -o luaconnector51.so -Wall -ltaos
/*
** $Id: lauxlib.h,v 1.88.1.1 2007/12/27 13:02:25 roberto Exp $
** Auxiliary functions for building Lua libraries
** See Copyright Notice in lua.h
*/
#ifndef lauxlib_h
#define lauxlib_h
#include <stddef.h>
#include <stdio.h>
#include "lua.h"
/* extra error code for `luaL_load' */
#define LUA_ERRFILE (LUA_ERRERR+1)
typedef struct luaL_Reg {
const char *name;
lua_CFunction func;
} luaL_Reg;
LUALIB_API void (luaL_openlib) (lua_State *L, const char *libname,
const luaL_Reg *l, int nup);
LUALIB_API void (luaL_register) (lua_State *L, const char *libname,
const luaL_Reg *l);
LUALIB_API int (luaL_getmetafield) (lua_State *L, int obj, const char *e);
LUALIB_API int (luaL_callmeta) (lua_State *L, int obj, const char *e);
LUALIB_API int (luaL_typerror) (lua_State *L, int narg, const char *tname);
LUALIB_API int (luaL_argerror) (lua_State *L, int numarg, const char *extramsg);
LUALIB_API const char *(luaL_checklstring) (lua_State *L, int numArg,
size_t *l);
LUALIB_API const char *(luaL_optlstring) (lua_State *L, int numArg,
const char *def, size_t *l);
LUALIB_API lua_Number (luaL_checknumber) (lua_State *L, int numArg);
LUALIB_API lua_Number (luaL_optnumber) (lua_State *L, int nArg, lua_Number def);
LUALIB_API lua_Integer (luaL_checkinteger) (lua_State *L, int numArg);
LUALIB_API lua_Integer (luaL_optinteger) (lua_State *L, int nArg,
lua_Integer def);
LUALIB_API void (luaL_checkstack) (lua_State *L, int sz, const char *msg);
LUALIB_API void (luaL_checktype) (lua_State *L, int narg, int t);
LUALIB_API void (luaL_checkany) (lua_State *L, int narg);
LUALIB_API int (luaL_newmetatable) (lua_State *L, const char *tname);
LUALIB_API void *(luaL_checkudata) (lua_State *L, int ud, const char *tname);
LUALIB_API void (luaL_where) (lua_State *L, int lvl);
LUALIB_API int (luaL_error) (lua_State *L, const char *fmt, ...);
LUALIB_API int (luaL_checkoption) (lua_State *L, int narg, const char *def,
const char *const lst[]);
/* pre-defined references */
#define LUA_NOREF (-2)
#define LUA_REFNIL (-1)
LUALIB_API int (luaL_ref) (lua_State *L, int t);
LUALIB_API void (luaL_unref) (lua_State *L, int t, int ref);
LUALIB_API int (luaL_loadfile) (lua_State *L, const char *filename);
LUALIB_API int (luaL_loadbuffer) (lua_State *L, const char *buff, size_t sz,
const char *name);
LUALIB_API int (luaL_loadstring) (lua_State *L, const char *s);
LUALIB_API lua_State *(luaL_newstate) (void);
LUALIB_API const char *(luaL_gsub) (lua_State *L, const char *s, const char *p,
const char *r);
LUALIB_API const char *(luaL_findtable) (lua_State *L, int idx,
const char *fname, int szhint);
/* From Lua 5.2. */
LUALIB_API int luaL_fileresult(lua_State *L, int stat, const char *fname);
LUALIB_API int luaL_execresult(lua_State *L, int stat);
LUALIB_API int (luaL_loadfilex) (lua_State *L, const char *filename,
const char *mode);
LUALIB_API int (luaL_loadbufferx) (lua_State *L, const char *buff, size_t sz,
const char *name, const char *mode);
LUALIB_API void luaL_traceback (lua_State *L, lua_State *L1, const char *msg,
int level);
LUALIB_API void (luaL_setfuncs) (lua_State *L, const luaL_Reg *l, int nup);
LUALIB_API void (luaL_pushmodule) (lua_State *L, const char *modname,
int sizehint);
LUALIB_API void *(luaL_testudata) (lua_State *L, int ud, const char *tname);
LUALIB_API void (luaL_setmetatable) (lua_State *L, const char *tname);
/*
** ===============================================================
** some useful macros
** ===============================================================
*/
#define luaL_argcheck(L, cond,numarg,extramsg) \
((void)((cond) || luaL_argerror(L, (numarg), (extramsg))))
#define luaL_checkstring(L,n) (luaL_checklstring(L, (n), NULL))
#define luaL_optstring(L,n,d) (luaL_optlstring(L, (n), (d), NULL))
#define luaL_checkint(L,n) ((int)luaL_checkinteger(L, (n)))
#define luaL_optint(L,n,d) ((int)luaL_optinteger(L, (n), (d)))
#define luaL_checklong(L,n) ((long)luaL_checkinteger(L, (n)))
#define luaL_optlong(L,n,d) ((long)luaL_optinteger(L, (n), (d)))
#define luaL_typename(L,i) lua_typename(L, lua_type(L,(i)))
#define luaL_dofile(L, fn) \
(luaL_loadfile(L, fn) || lua_pcall(L, 0, LUA_MULTRET, 0))
#define luaL_dostring(L, s) \
(luaL_loadstring(L, s) || lua_pcall(L, 0, LUA_MULTRET, 0))
#define luaL_getmetatable(L,n) (lua_getfield(L, LUA_REGISTRYINDEX, (n)))
#define luaL_opt(L,f,n,d) (lua_isnoneornil(L,(n)) ? (d) : f(L,(n)))
/* From Lua 5.2. */
#define luaL_newlibtable(L, l) \
lua_createtable(L, 0, sizeof(l)/sizeof((l)[0]) - 1)
#define luaL_newlib(L, l) (luaL_newlibtable(L, l), luaL_setfuncs(L, l, 0))
/*
** {======================================================
** Generic Buffer manipulation
** =======================================================
*/
typedef struct luaL_Buffer {
char *p; /* current position in buffer */
int lvl; /* number of strings in the stack (level) */
lua_State *L;
char buffer[LUAL_BUFFERSIZE];
} luaL_Buffer;
#define luaL_addchar(B,c) \
((void)((B)->p < ((B)->buffer+LUAL_BUFFERSIZE) || luaL_prepbuffer(B)), \
(*(B)->p++ = (char)(c)))
/* compatibility only */
#define luaL_putchar(B,c) luaL_addchar(B,c)
#define luaL_addsize(B,n) ((B)->p += (n))
LUALIB_API void (luaL_buffinit) (lua_State *L, luaL_Buffer *B);
LUALIB_API char *(luaL_prepbuffer) (luaL_Buffer *B);
LUALIB_API void (luaL_addlstring) (luaL_Buffer *B, const char *s, size_t l);
LUALIB_API void (luaL_addstring) (luaL_Buffer *B, const char *s);
LUALIB_API void (luaL_addvalue) (luaL_Buffer *B);
LUALIB_API void (luaL_pushresult) (luaL_Buffer *B);
/* }====================================================== */
#endif
/*
** $Id: lua.h,v 1.218.1.5 2008/08/06 13:30:12 roberto Exp $
** Lua - An Extensible Extension Language
** Lua.org, PUC-Rio, Brazil (http://www.lua.org)
** See Copyright Notice at the end of this file
*/
#ifndef lua_h
#define lua_h
#include <stdarg.h>
#include <stddef.h>
#include "luaconf.h"
#define LUA_VERSION "Lua 5.1"
#define LUA_RELEASE "Lua 5.1.4"
#define LUA_VERSION_NUM 501
#define LUA_COPYRIGHT "Copyright (C) 1994-2008 Lua.org, PUC-Rio"
#define LUA_AUTHORS "R. Ierusalimschy, L. H. de Figueiredo & W. Celes"
/* mark for precompiled code (`<esc>Lua') */
#define LUA_SIGNATURE "\033Lua"
/* option for multiple returns in `lua_pcall' and `lua_call' */
#define LUA_MULTRET (-1)
/*
** pseudo-indices
*/
#define LUA_REGISTRYINDEX (-10000)
#define LUA_ENVIRONINDEX (-10001)
#define LUA_GLOBALSINDEX (-10002)
#define lua_upvalueindex(i) (LUA_GLOBALSINDEX-(i))
/* thread status */
#define LUA_OK 0
#define LUA_YIELD 1
#define LUA_ERRRUN 2
#define LUA_ERRSYNTAX 3
#define LUA_ERRMEM 4
#define LUA_ERRERR 5
typedef struct lua_State lua_State;
typedef int (*lua_CFunction) (lua_State *L);
/*
** functions that read/write blocks when loading/dumping Lua chunks
*/
typedef const char * (*lua_Reader) (lua_State *L, void *ud, size_t *sz);
typedef int (*lua_Writer) (lua_State *L, const void* p, size_t sz, void* ud);
/*
** prototype for memory-allocation functions
*/
typedef void * (*lua_Alloc) (void *ud, void *ptr, size_t osize, size_t nsize);
/*
** basic types
*/
#define LUA_TNONE (-1)
#define LUA_TNIL 0
#define LUA_TBOOLEAN 1
#define LUA_TLIGHTUSERDATA 2
#define LUA_TNUMBER 3
#define LUA_TSTRING 4
#define LUA_TTABLE 5
#define LUA_TFUNCTION 6
#define LUA_TUSERDATA 7
#define LUA_TTHREAD 8
/* minimum Lua stack available to a C function */
#define LUA_MINSTACK 20
/*
** generic extra include file
*/
#if defined(LUA_USER_H)
#include LUA_USER_H
#endif
/* type of numbers in Lua */
typedef LUA_NUMBER lua_Number;
/* type for integer functions */
typedef LUA_INTEGER lua_Integer;
/*
** state manipulation
*/
LUA_API lua_State *(lua_newstate) (lua_Alloc f, void *ud);
LUA_API void (lua_close) (lua_State *L);
LUA_API lua_State *(lua_newthread) (lua_State *L);
LUA_API lua_CFunction (lua_atpanic) (lua_State *L, lua_CFunction panicf);
/*
** basic stack manipulation
*/
LUA_API int (lua_gettop) (lua_State *L);
LUA_API void (lua_settop) (lua_State *L, int idx);
LUA_API void (lua_pushvalue) (lua_State *L, int idx);
LUA_API void (lua_remove) (lua_State *L, int idx);
LUA_API void (lua_insert) (lua_State *L, int idx);
LUA_API void (lua_replace) (lua_State *L, int idx);
LUA_API int (lua_checkstack) (lua_State *L, int sz);
LUA_API void (lua_xmove) (lua_State *from, lua_State *to, int n);
/*
** access functions (stack -> C)
*/
LUA_API int (lua_isnumber) (lua_State *L, int idx);
LUA_API int (lua_isstring) (lua_State *L, int idx);
LUA_API int (lua_iscfunction) (lua_State *L, int idx);
LUA_API int (lua_isuserdata) (lua_State *L, int idx);
LUA_API int (lua_type) (lua_State *L, int idx);
LUA_API const char *(lua_typename) (lua_State *L, int tp);
LUA_API int (lua_equal) (lua_State *L, int idx1, int idx2);
LUA_API int (lua_rawequal) (lua_State *L, int idx1, int idx2);
LUA_API int (lua_lessthan) (lua_State *L, int idx1, int idx2);
LUA_API lua_Number (lua_tonumber) (lua_State *L, int idx);
LUA_API lua_Integer (lua_tointeger) (lua_State *L, int idx);
LUA_API int (lua_toboolean) (lua_State *L, int idx);
LUA_API const char *(lua_tolstring) (lua_State *L, int idx, size_t *len);
LUA_API size_t (lua_objlen) (lua_State *L, int idx);
LUA_API lua_CFunction (lua_tocfunction) (lua_State *L, int idx);
LUA_API void *(lua_touserdata) (lua_State *L, int idx);
LUA_API lua_State *(lua_tothread) (lua_State *L, int idx);
LUA_API const void *(lua_topointer) (lua_State *L, int idx);
/*
** push functions (C -> stack)
*/
LUA_API void (lua_pushnil) (lua_State *L);
LUA_API void (lua_pushnumber) (lua_State *L, lua_Number n);
LUA_API void (lua_pushinteger) (lua_State *L, lua_Integer n);
LUA_API void (lua_pushlstring) (lua_State *L, const char *s, size_t l);
LUA_API void (lua_pushstring) (lua_State *L, const char *s);
LUA_API const char *(lua_pushvfstring) (lua_State *L, const char *fmt,
va_list argp);
LUA_API const char *(lua_pushfstring) (lua_State *L, const char *fmt, ...);
LUA_API void (lua_pushcclosure) (lua_State *L, lua_CFunction fn, int n);
LUA_API void (lua_pushboolean) (lua_State *L, int b);
LUA_API void (lua_pushlightuserdata) (lua_State *L, void *p);
LUA_API int (lua_pushthread) (lua_State *L);
/*
** get functions (Lua -> stack)
*/
LUA_API void (lua_gettable) (lua_State *L, int idx);
LUA_API void (lua_getfield) (lua_State *L, int idx, const char *k);
LUA_API void (lua_rawget) (lua_State *L, int idx);
LUA_API void (lua_rawgeti) (lua_State *L, int idx, int n);
LUA_API void (lua_createtable) (lua_State *L, int narr, int nrec);
LUA_API void *(lua_newuserdata) (lua_State *L, size_t sz);
LUA_API int (lua_getmetatable) (lua_State *L, int objindex);
LUA_API void (lua_getfenv) (lua_State *L, int idx);
/*
** set functions (stack -> Lua)
*/
LUA_API void (lua_settable) (lua_State *L, int idx);
LUA_API void (lua_setfield) (lua_State *L, int idx, const char *k);
LUA_API void (lua_rawset) (lua_State *L, int idx);
LUA_API void (lua_rawseti) (lua_State *L, int idx, int n);
LUA_API int (lua_setmetatable) (lua_State *L, int objindex);
LUA_API int (lua_setfenv) (lua_State *L, int idx);
/*
** `load' and `call' functions (load and run Lua code)
*/
LUA_API void (lua_call) (lua_State *L, int nargs, int nresults);
LUA_API int (lua_pcall) (lua_State *L, int nargs, int nresults, int errfunc);
LUA_API int (lua_cpcall) (lua_State *L, lua_CFunction func, void *ud);
LUA_API int (lua_load) (lua_State *L, lua_Reader reader, void *dt,
const char *chunkname);
LUA_API int (lua_dump) (lua_State *L, lua_Writer writer, void *data);
/*
** coroutine functions
*/
LUA_API int (lua_yield) (lua_State *L, int nresults);
LUA_API int (lua_resume) (lua_State *L, int narg);
LUA_API int (lua_status) (lua_State *L);
/*
** garbage-collection function and options
*/
#define LUA_GCSTOP 0
#define LUA_GCRESTART 1
#define LUA_GCCOLLECT 2
#define LUA_GCCOUNT 3
#define LUA_GCCOUNTB 4
#define LUA_GCSTEP 5
#define LUA_GCSETPAUSE 6
#define LUA_GCSETSTEPMUL 7
#define LUA_GCISRUNNING 9
LUA_API int (lua_gc) (lua_State *L, int what, int data);
/*
** miscellaneous functions
*/
LUA_API int (lua_error) (lua_State *L);
LUA_API int (lua_next) (lua_State *L, int idx);
LUA_API void (lua_concat) (lua_State *L, int n);
LUA_API lua_Alloc (lua_getallocf) (lua_State *L, void **ud);
LUA_API void lua_setallocf (lua_State *L, lua_Alloc f, void *ud);
LUA_API void lua_setexdata(lua_State *L, void *exdata);
LUA_API void *lua_getexdata(lua_State *L);
/*
** ===============================================================
** some useful macros
** ===============================================================
*/
#define lua_pop(L,n) lua_settop(L, -(n)-1)
#define lua_newtable(L) lua_createtable(L, 0, 0)
#define lua_register(L,n,f) (lua_pushcfunction(L, (f)), lua_setglobal(L, (n)))
#define lua_pushcfunction(L,f) lua_pushcclosure(L, (f), 0)
#define lua_strlen(L,i) lua_objlen(L, (i))
#define lua_isfunction(L,n) (lua_type(L, (n)) == LUA_TFUNCTION)
#define lua_istable(L,n) (lua_type(L, (n)) == LUA_TTABLE)
#define lua_islightuserdata(L,n) (lua_type(L, (n)) == LUA_TLIGHTUSERDATA)
#define lua_isnil(L,n) (lua_type(L, (n)) == LUA_TNIL)
#define lua_isboolean(L,n) (lua_type(L, (n)) == LUA_TBOOLEAN)
#define lua_isthread(L,n) (lua_type(L, (n)) == LUA_TTHREAD)
#define lua_isnone(L,n) (lua_type(L, (n)) == LUA_TNONE)
#define lua_isnoneornil(L, n) (lua_type(L, (n)) <= 0)
#define lua_pushliteral(L, s) \
lua_pushlstring(L, "" s, (sizeof(s)/sizeof(char))-1)
#define lua_setglobal(L,s) lua_setfield(L, LUA_GLOBALSINDEX, (s))
#define lua_getglobal(L,s) lua_getfield(L, LUA_GLOBALSINDEX, (s))
#define lua_tostring(L,i) lua_tolstring(L, (i), NULL)
/*
** compatibility macros and functions
*/
#define lua_open() luaL_newstate()
#define lua_getregistry(L) lua_pushvalue(L, LUA_REGISTRYINDEX)
#define lua_getgccount(L) lua_gc(L, LUA_GCCOUNT, 0)
#define lua_Chunkreader lua_Reader
#define lua_Chunkwriter lua_Writer
/* hack */
LUA_API void lua_setlevel (lua_State *from, lua_State *to);
/*
** {======================================================================
** Debug API
** =======================================================================
*/
/*
** Event codes
*/
#define LUA_HOOKCALL 0
#define LUA_HOOKRET 1
#define LUA_HOOKLINE 2
#define LUA_HOOKCOUNT 3
#define LUA_HOOKTAILRET 4
/*
** Event masks
*/
#define LUA_MASKCALL (1 << LUA_HOOKCALL)
#define LUA_MASKRET (1 << LUA_HOOKRET)
#define LUA_MASKLINE (1 << LUA_HOOKLINE)
#define LUA_MASKCOUNT (1 << LUA_HOOKCOUNT)
typedef struct lua_Debug lua_Debug; /* activation record */
/* Functions to be called by the debuger in specific events */
typedef void (*lua_Hook) (lua_State *L, lua_Debug *ar);
LUA_API int lua_getstack (lua_State *L, int level, lua_Debug *ar);
LUA_API int lua_getinfo (lua_State *L, const char *what, lua_Debug *ar);
LUA_API const char *lua_getlocal (lua_State *L, const lua_Debug *ar, int n);
LUA_API const char *lua_setlocal (lua_State *L, const lua_Debug *ar, int n);
LUA_API const char *lua_getupvalue (lua_State *L, int funcindex, int n);
LUA_API const char *lua_setupvalue (lua_State *L, int funcindex, int n);
LUA_API int lua_sethook (lua_State *L, lua_Hook func, int mask, int count);
LUA_API lua_Hook lua_gethook (lua_State *L);
LUA_API int lua_gethookmask (lua_State *L);
LUA_API int lua_gethookcount (lua_State *L);
/* From Lua 5.2. */
LUA_API void *lua_upvalueid (lua_State *L, int idx, int n);
LUA_API void lua_upvaluejoin (lua_State *L, int idx1, int n1, int idx2, int n2);
LUA_API int lua_loadx (lua_State *L, lua_Reader reader, void *dt,
const char *chunkname, const char *mode);
LUA_API const lua_Number *lua_version (lua_State *L);
LUA_API void lua_copy (lua_State *L, int fromidx, int toidx);
LUA_API lua_Number lua_tonumberx (lua_State *L, int idx, int *isnum);
LUA_API lua_Integer lua_tointegerx (lua_State *L, int idx, int *isnum);
/* From Lua 5.3. */
LUA_API int lua_isyieldable (lua_State *L);
struct lua_Debug {
int event;
const char *name; /* (n) */
const char *namewhat; /* (n) `global', `local', `field', `method' */
const char *what; /* (S) `Lua', `C', `main', `tail' */
const char *source; /* (S) */
int currentline; /* (l) */
int nups; /* (u) number of upvalues */
int linedefined; /* (S) */
int lastlinedefined; /* (S) */
char short_src[LUA_IDSIZE]; /* (S) */
/* private part */
int i_ci; /* active function */
};
/* }====================================================================== */
/******************************************************************************
* Copyright (C) 1994-2008 Lua.org, PUC-Rio. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
******************************************************************************/
#endif
#include <stdio.h>
#include <math.h>
#include <stdarg.h>
#include <stdlib.h>
#include "lua.h"
#include "lauxlib.h"
#include "lualib.h"
#include <taos.h>
struct cb_param{
lua_State* state;
int callback;
void * stream;
};
static int l_connect(lua_State *L){
TAOS * taos=NULL;
const char* host;
const char* database;
const char* user;
const char* password;
int port;
luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L,-1,"host");
if (lua_isstring(L,-1)){
host = lua_tostring(L, -1);
// printf("host = %s\n", host);
}
lua_getfield(L, 1, "port");
if (lua_isnumber(L,-1)){
port = lua_tonumber(L, -1);
//printf("port = %d\n", port);
}
lua_getfield(L, 1, "database");
if (lua_isstring(L, -1)){
database = lua_tostring(L, -1);
//printf("database = %s\n", database);
}
lua_getfield(L, 1, "user");
if (lua_isstring(L, -1)){
user = lua_tostring(L, -1);
//printf("user = %s\n", user);
}
lua_getfield(L, 1, "password");
if (lua_isstring(L, -1)){
password = lua_tostring(L, -1);
//printf("password = %s\n", password);
}
lua_settop(L,0);
taos_init();
lua_newtable(L);
int table_index = lua_gettop(L);
taos = taos_connect(host, user,password,database, port);
if (taos == NULL) {
printf("failed to connect server, reason:%s\n", taos_errstr(taos));
lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "conn");
}else{
// printf("success to connect server\n");
lua_pushinteger(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,taos);
lua_setfield(L, table_index, "conn");
}
return 1;
}
static int l_query(lua_State *L){
TAOS *taos= (TAOS*)lua_topointer(L,1);
const char* s = lua_tostring(L, 2);
TAOS_RES *result;
lua_newtable(L);
int table_index = lua_gettop(L);
// printf("receive command:%s\r\n",s);
result = taos_query(taos, s);
int32_t code = taos_errno(result);
if( code != 0){
printf("failed, reason:%s\n", taos_errstr(result));
lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
return 1;
}else{
//printf("success to query.\n");
TAOS_ROW row;
int rows = 0;
int num_fields = taos_field_count(result);
const TAOS_FIELD *fields = taos_fetch_fields(result);
//char temp[256];
const int affectRows = taos_affected_rows(result);
// printf(" affect rows:%d\r\n", affectRows);
lua_pushinteger(L, 0);
lua_setfield(L, table_index, "code");
lua_pushinteger(L, affectRows);
lua_setfield(L, table_index, "affected");
lua_newtable(L);
while ((row = taos_fetch_row(result))) {
//printf("row index:%d\n",rows);
rows++;
lua_pushnumber(L,rows);
lua_newtable(L);
for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL) {
continue;
}
lua_pushstring(L,fields[i].name);
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
lua_pushnumber(L,*((float *)row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
lua_pushstring(L,(char *)row[i]);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
lua_pushinteger(L,*((char *)row[i]));
break;
default:
lua_pushnil(L);
break;
}
lua_settable(L,-3);
}
lua_settable(L,-3);
}
taos_free_result(result);
}
lua_setfield(L, table_index, "item");
return 1;
}
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
struct cb_param* p = (struct cb_param*) param;
TAOS_FIELD *fields = taos_fetch_fields(result);
int numFields = taos_num_fields(result);
// printf("\nnumfields:%d\n", numFields);
//printf("\n\r-----------------------------------------------------------------------------------\n");
lua_State *L = p->state;
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
lua_newtable(L);
for (int i = 0; i < numFields; ++i) {
if (row[i] == NULL) {
continue;
}
lua_pushstring(L,fields[i].name);
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
lua_pushinteger(L,*((char *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
lua_pushinteger(L,*((short *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
lua_pushinteger(L,*((int *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
lua_pushnumber(L,*((float *)row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
lua_pushnumber(L,*((double *)row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
lua_pushstring(L,(char *)row[i]);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
lua_pushinteger(L,*((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
lua_pushinteger(L,*((char *)row[i]));
break;
default:
lua_pushnil(L);
break;
}
lua_settable(L, -3);
}
lua_call(L, 1, 0);
// printf("-----------------------------------------------------------------------------------\n\r");
}
static int l_open_stream(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = (TAOS*)lua_topointer(L,1);
const char * sqlstr = lua_tostring(L,2);
int stime = luaL_checknumber(L,3);
lua_newtable(L);
int table_index = lua_gettop(L);
struct cb_param *p = malloc(sizeof(struct cb_param));
p->state = L;
p->callback=r;
// printf("r:%d, L:%d\n",r,L);
void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL);
if (s == NULL) {
printf("failed to open stream, reason:%s\n", taos_errstr(taos));
free(p);
lua_pushnumber(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "stream");
}else{
// printf("success to open stream\n");
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
p->stream = s;
lua_pushlightuserdata(L,p);
lua_setfield(L, table_index, "stream");//stream has different content in lua and c.
}
return 1;
}
static int l_close_stream(lua_State *L){
//TODO:get stream and free cb_param
struct cb_param *p = lua_touserdata(L,1);
taos_close_stream(p->stream);
free(p);
return 0;
}
static int l_close(lua_State *L){
TAOS *taos= (TAOS*)lua_topointer(L,1);
lua_newtable(L);
int table_index = lua_gettop(L);
if(taos == NULL){
lua_pushnumber(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, "null pointer.");
lua_setfield(L, table_index, "error");
}else{
taos_close(taos);
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, "done.");
lua_setfield(L, table_index, "error");
}
return 1;
}
static const struct luaL_Reg lib[] = {
{"connect", l_connect},
{"query", l_query},
{"close", l_close},
{"open_stream", l_open_stream},
{"close_stream", l_close_stream},
{NULL, NULL}
};
extern int luaopen_luaconnector51(lua_State* L)
{
// luaL_register(L, "luaconnector51", lib);
lua_newtable (L);
luaL_setfuncs(L,lib,0);
return 1;
}
/*
** Configuration header.
** Copyright (C) 2005-2017 Mike Pall. See Copyright Notice in luajit.h
*/
#ifndef luaconf_h
#define luaconf_h
#ifndef WINVER
#define WINVER 0x0501
#endif
#include <limits.h>
#include <stddef.h>
/* Default path for loading Lua and C modules with require(). */
#if defined(_WIN32)
/*
** In Windows, any exclamation mark ('!') in the path is replaced by the
** path of the directory of the executable file of the current process.
*/
#define LUA_LDIR "!\\lua\\"
#define LUA_CDIR "!\\"
#define LUA_PATH_DEFAULT \
".\\?.lua;" "!\\lualib\\?.lua;" LUA_LDIR"?.lua;" LUA_LDIR"?\\init.lua;"
#define LUA_CPATH_DEFAULT \
".\\?.dll;" "!\\lualib\\?.so;" LUA_CDIR"?.dll;" LUA_CDIR"loadall.dll"
#else
/*
** Note to distribution maintainers: do NOT patch the following lines!
** Please read ../doc/install.html#distro and pass PREFIX=/usr instead.
*/
#ifndef LUA_MULTILIB
#define LUA_MULTILIB "lib"
#endif
#ifndef LUA_LMULTILIB
#define LUA_LMULTILIB "lib"
#endif
#define LUA_LROOT "/usr/local"
#define LUA_LUADIR "/lua/5.1/"
#define LUA_LJDIR "/luajit-2.1.0-beta3/"
#ifdef LUA_ROOT
#define LUA_JROOT LUA_ROOT
#define LUA_RLDIR LUA_ROOT "/share" LUA_LUADIR
#define LUA_RCDIR LUA_ROOT "/" LUA_MULTILIB LUA_LUADIR
#define LUA_RLPATH ";" LUA_RLDIR "?.lua;" LUA_RLDIR "?/init.lua"
#define LUA_RCPATH ";" LUA_RCDIR "?.so"
#else
#define LUA_JROOT LUA_LROOT
#define LUA_RLPATH
#define LUA_RCPATH
#endif
#define LUA_JPATH ";" LUA_JROOT "/share" LUA_LJDIR "?.lua"
#define LUA_LLDIR LUA_LROOT "/share" LUA_LUADIR
#define LUA_LCDIR LUA_LROOT "/" LUA_LMULTILIB LUA_LUADIR
#define LUA_LLPATH ";" LUA_LLDIR "?.lua;" LUA_LLDIR "?/init.lua"
#define LUA_LCPATH1 ";" LUA_LCDIR "?.so"
#define LUA_LCPATH2 ";" LUA_LCDIR "loadall.so"
#define LUA_PATH_DEFAULT "./?.lua" LUA_JPATH LUA_LLPATH LUA_RLPATH
#define LUA_CPATH_DEFAULT "./?.so" LUA_LCPATH1 LUA_RCPATH LUA_LCPATH2
#endif
/* Environment variable names for path overrides and initialization code. */
#define LUA_PATH "LUA_PATH"
#define LUA_CPATH "LUA_CPATH"
#define LUA_INIT "LUA_INIT"
/* Special file system characters. */
#if defined(_WIN32)
#define LUA_DIRSEP "\\"
#else
#define LUA_DIRSEP "/"
#endif
#define LUA_PATHSEP ";"
#define LUA_PATH_MARK "?"
#define LUA_EXECDIR "!"
#define LUA_IGMARK "-"
#define LUA_PATH_CONFIG \
LUA_DIRSEP "\n" LUA_PATHSEP "\n" LUA_PATH_MARK "\n" \
LUA_EXECDIR "\n" LUA_IGMARK "\n"
/* Quoting in error messages. */
#define LUA_QL(x) "'" x "'"
#define LUA_QS LUA_QL("%s")
/* Various tunables. */
#define LUAI_MAXSTACK 65500 /* Max. # of stack slots for a thread (<64K). */
#define LUAI_MAXCSTACK 8000 /* Max. # of stack slots for a C func (<10K). */
#define LUAI_GCPAUSE 200 /* Pause GC until memory is at 200%. */
#define LUAI_GCMUL 200 /* Run GC at 200% of allocation speed. */
#define LUA_MAXCAPTURES 32 /* Max. pattern captures. */
/* Configuration for the frontend (the luajit executable). */
#if defined(luajit_c)
#define LUA_PROGNAME "luajit" /* Fallback frontend name. */
#define LUA_PROMPT "> " /* Interactive prompt. */
#define LUA_PROMPT2 ">> " /* Continuation prompt. */
#define LUA_MAXINPUT 512 /* Max. input line length. */
#endif
/* Note: changing the following defines breaks the Lua 5.1 ABI. */
#define LUA_INTEGER ptrdiff_t
#define LUA_IDSIZE 60 /* Size of lua_Debug.short_src. */
/*
** Size of lauxlib and io.* on-stack buffers. Weird workaround to avoid using
** unreasonable amounts of stack space, but still retain ABI compatibility.
** Blame Lua for depending on BUFSIZ in the ABI, blame **** for wrecking it.
*/
#define LUAL_BUFFERSIZE (BUFSIZ > 16384 ? 8192 : BUFSIZ)
/* The following defines are here only for compatibility with luaconf.h
** from the standard Lua distribution. They must not be changed for LuaJIT.
*/
#define LUA_NUMBER_DOUBLE
#define LUA_NUMBER double
#define LUAI_UACNUMBER double
#define LUA_NUMBER_SCAN "%lf"
#define LUA_NUMBER_FMT "%.14g"
#define lua_number2str(s, n) sprintf((s), LUA_NUMBER_FMT, (n))
#define LUAI_MAXNUMBER2STR 32
#define LUA_INTFRMLEN "l"
#define LUA_INTFRM_T long
/* Linkage of public API functions. */
#if defined(LUA_BUILD_AS_DLL)
#if defined(LUA_CORE) || defined(LUA_LIB)
#define LUA_API __declspec(dllexport)
#else
#define LUA_API __declspec(dllimport)
#endif
#else
#define LUA_API extern
#endif
#define LUALIB_API LUA_API
/* Support for internal assertions. */
#if defined(LUA_USE_ASSERT) || defined(LUA_USE_APICHECK)
#include <assert.h>
#endif
#ifdef LUA_USE_ASSERT
#define lua_assert(x) assert(x)
#endif
#ifdef LUA_USE_APICHECK
#define luai_apicheck(L, o) { (void)L; assert(o); }
#else
#define luai_apicheck(L, o) { (void)L; }
#endif
#endif
/*
** LuaJIT -- a Just-In-Time Compiler for Lua. http://luajit.org/
**
** Copyright (C) 2005-2017 Mike Pall. All rights reserved.
**
** Permission is hereby granted, free of charge, to any person obtaining
** a copy of this software and associated documentation files (the
** "Software"), to deal in the Software without restriction, including
** without limitation the rights to use, copy, modify, merge, publish,
** distribute, sublicense, and/or sell copies of the Software, and to
** permit persons to whom the Software is furnished to do so, subject to
** the following conditions:
**
** The above copyright notice and this permission notice shall be
** included in all copies or substantial portions of the Software.
**
** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
** MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
** IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
** CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
** TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
** SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
**
** [ MIT license: http://www.opensource.org/licenses/mit-license.php ]
*/
#ifndef _LUAJIT_H
#define _LUAJIT_H
#include "lua.h"
#define OPENRESTY_LUAJIT
#define LUAJIT_VERSION "LuaJIT 2.1.0-beta3"
#define LUAJIT_VERSION_NUM 20100 /* Version 2.1.0 = 02.01.00. */
#define LUAJIT_VERSION_SYM luaJIT_version_2_1_0_beta3
#define LUAJIT_COPYRIGHT "Copyright (C) 2005-2017 Mike Pall"
#define LUAJIT_URL "http://luajit.org/"
/* Modes for luaJIT_setmode. */
#define LUAJIT_MODE_MASK 0x00ff
enum {
LUAJIT_MODE_ENGINE, /* Set mode for whole JIT engine. */
LUAJIT_MODE_DEBUG, /* Set debug mode (idx = level). */
LUAJIT_MODE_FUNC, /* Change mode for a function. */
LUAJIT_MODE_ALLFUNC, /* Recurse into subroutine protos. */
LUAJIT_MODE_ALLSUBFUNC, /* Change only the subroutines. */
LUAJIT_MODE_TRACE, /* Flush a compiled trace. */
LUAJIT_MODE_WRAPCFUNC = 0x10, /* Set wrapper mode for C function calls. */
LUAJIT_MODE_MAX
};
/* Flags or'ed in to the mode. */
#define LUAJIT_MODE_OFF 0x0000 /* Turn feature off. */
#define LUAJIT_MODE_ON 0x0100 /* Turn feature on. */
#define LUAJIT_MODE_FLUSH 0x0200 /* Flush JIT-compiled code. */
/* LuaJIT public C API. */
/* Control the JIT engine. */
LUA_API int luaJIT_setmode(lua_State *L, int idx, int mode);
/* Low-overhead profiling API. */
typedef void (*luaJIT_profile_callback)(void *data, lua_State *L,
int samples, int vmstate);
LUA_API void luaJIT_profile_start(lua_State *L, const char *mode,
luaJIT_profile_callback cb, void *data);
LUA_API void luaJIT_profile_stop(lua_State *L);
LUA_API const char *luaJIT_profile_dumpstack(lua_State *L, const char *fmt,
int depth, size_t *len);
/* Enforce (dynamic) linker error for version mismatches. Call from main. */
LUA_API void LUAJIT_VERSION_SYM(void);
#endif
/*
** Standard library header.
** Copyright (C) 2005-2017 Mike Pall. See Copyright Notice in luajit.h
*/
#ifndef _LUALIB_H
#define _LUALIB_H
#include "lua.h"
#define LUA_FILEHANDLE "FILE*"
#define LUA_COLIBNAME "coroutine"
#define LUA_MATHLIBNAME "math"
#define LUA_STRLIBNAME "string"
#define LUA_TABLIBNAME "table"
#define LUA_IOLIBNAME "io"
#define LUA_OSLIBNAME "os"
#define LUA_LOADLIBNAME "package"
#define LUA_DBLIBNAME "debug"
#define LUA_BITLIBNAME "bit"
#define LUA_JITLIBNAME "jit"
#define LUA_FFILIBNAME "ffi"
#define LUA_THRLIBNAME "thread"
LUALIB_API int luaopen_base(lua_State *L);
LUALIB_API int luaopen_math(lua_State *L);
LUALIB_API int luaopen_string(lua_State *L);
LUALIB_API int luaopen_table(lua_State *L);
LUALIB_API int luaopen_io(lua_State *L);
LUALIB_API int luaopen_os(lua_State *L);
LUALIB_API int luaopen_package(lua_State *L);
LUALIB_API int luaopen_debug(lua_State *L);
LUALIB_API int luaopen_bit(lua_State *L);
LUALIB_API int luaopen_jit(lua_State *L);
LUALIB_API int luaopen_ffi(lua_State *L);
LUALIB_API void luaL_openlibs(lua_State *L);
#ifndef lua_assert
#define lua_assert(x) ((void)0)
#endif
#endif
......@@ -13,17 +13,49 @@ struct cb_param{
void * stream;
};
static int l_connect(lua_State *L){
TAOS * taos=NULL;
const char* host;
const char* database;
const char* user;
const char* password;
int port;
luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L,-1,"host");
if (lua_isstring(L,-1)){
host = lua_tostring(L, -1);
// printf("host = %s\n", host);
}
lua_getfield(L, 1, "port");
if (lua_isinteger(L,-1)){
port = lua_tointeger(L, -1);
//printf("port = %d\n", port);
}
lua_getfield(L, 1, "database");
if (lua_isstring(L, -1)){
database = lua_tostring(L, -1);
//printf("database = %s\n", database);
}
lua_getfield(L, 1, "user");
if (lua_isstring(L, -1)){
user = lua_tostring(L, -1);
//printf("user = %s\n", user);
}
lua_getfield(L, 1, "password");
if (lua_isstring(L, -1)){
password = lua_tostring(L, -1);
//printf("password = %s\n", password);
}
lua_settop(L,0);
static int l_connect(lua_State *L){
TAOS * taos;
char *host = lua_tostring(L, 1);
char *user = lua_tostring(L, 2);
char *password = lua_tostring(L, 3);
char *database = lua_tostring(L, 4);
int port =luaL_checknumber(L, 5);
taos_init();
lua_newtable(L);
int table_index = lua_gettop(L);
......@@ -31,38 +63,38 @@ static int l_connect(lua_State *L){
if (taos == NULL) {
printf("failed to connect server, reason:%s\n", taos_errstr(taos));
lua_pushnumber(L, -1);
lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,NULL);
lua_setfield(L, table_index, "conn");
}else{
printf("success to connect server\n");
lua_pushnumber(L, 0);
// printf("success to connect server\n");
lua_pushinteger(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
lua_pushlightuserdata(L,taos);
lua_setfield(L, table_index, "conn");
}
return 1;
}
static int l_query(lua_State *L){
TAOS * taos= lua_topointer(L,1);
char *s = lua_tostring(L, 2);
TAOS *taos= (TAOS*)lua_topointer(L,1);
const char* s = lua_tostring(L, 2);
TAOS_RES *result;
lua_newtable(L);
int table_index = lua_gettop(L);
// printf("receive command:%s\r\n",s);
result = taos_query(taos,s);
int32_t code = taos_errno(result);
result = taos_query(taos, s);
int32_t code = taos_errno(result);
if( code != 0){
printf("failed, reason:%s\n", taos_errstr(result));
lua_pushnumber(L, -1);
lua_pushinteger(L, -1);
lua_setfield(L, table_index, "code");
lua_pushstring(L, taos_errstr(taos));
lua_setfield(L, table_index, "error");
......@@ -74,12 +106,12 @@ static int l_query(lua_State *L){
TAOS_ROW row;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256];
const TAOS_FIELD *fields = taos_fetch_fields(result);
//char temp[256];
int affectRows = taos_affected_rows(result);
const int affectRows = taos_affected_rows(result);
// printf(" affect rows:%d\r\n", affectRows);
lua_pushnumber(L, 0);
lua_pushinteger(L, 0);
lua_setfield(L, table_index, "code");
lua_pushinteger(L, affectRows);
lua_setfield(L, table_index, "affected");
......@@ -150,8 +182,8 @@ void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
TAOS_FIELD *fields = taos_fetch_fields(result);
int numFields = taos_num_fields(result);
printf("\nnumfields:%d\n", numFields);
printf("\n\r-----------------------------------------------------------------------------------\n");
// printf("\nnumfields:%d\n", numFields);
//printf("\n\r-----------------------------------------------------------------------------------\n");
lua_State *L = p->state;
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
......@@ -204,13 +236,13 @@ void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
lua_call(L, 1, 0);
printf("-----------------------------------------------------------------------------------\n\r");
// printf("-----------------------------------------------------------------------------------\n\r");
}
static int l_open_stream(lua_State *L){
int r = luaL_ref(L, LUA_REGISTRYINDEX);
TAOS * taos = lua_topointer(L,1);
char * sqlstr = lua_tostring(L,2);
TAOS * taos = (TAOS*)lua_topointer(L,1);
const char * sqlstr = lua_tostring(L,2);
int stime = luaL_checknumber(L,3);
lua_newtable(L);
......@@ -253,7 +285,7 @@ static int l_close_stream(lua_State *L){
}
static int l_close(lua_State *L){
TAOS * taos= lua_topointer(L,1);
TAOS *taos= (TAOS*)lua_topointer(L,1);
lua_newtable(L);
int table_index = lua_gettop(L);
......@@ -263,7 +295,7 @@ static int l_close(lua_State *L){
lua_pushstring(L, "null pointer.");
lua_setfield(L, table_index, "error");
}else{
taos_close(taos);
taos_close(taos);
lua_pushnumber(L, 0);
lua_setfield(L, table_index, "code");
lua_pushstring(L, "done.");
......
local driver = require "luaconnector"
local host="127.0.0.1"
local user="root"
local password="taosdata"
local db =nil
local port=6030
local conn
local config = {
host = "127.0.0.1",
port = 6030,
database = "",
user = "root",
password = "taosdata",
max_packet_size = 1024 * 1024
}
local res = driver.connect(host,user,password,db,port)
local conn
local res = driver.connect(config)
if res.code ~=0 then
print(res.error)
print("connect--- failed: "..res.error)
return
else
conn = res.conn
print("connect--- pass.")
end
local res = driver.query(conn,"drop database if exists demo")
res = driver.query(conn,"create database demo")
if res.code ~=0 then
print(res.error)
print("create db--- failed: "..res.error)
return
else
print("create db--- pass.")
end
res = driver.query(conn,"use demo")
if res.code ~=0 then
print(res.error)
print("select db--- failed: "..res.error)
return
else
print("select db--- pass.")
end
res = driver.query(conn,"create table m1 (ts timestamp, speed int,owner binary(20))")
if res.code ~=0 then
print(res.error)
print("create table---failed: "..res.error)
return
else
print("create table--- pass.")
end
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
if res.code ~=0 then
print(res.error)
print("insert records failed: "..res.error)
return
else
print("insert successfully, affected:"..res.affected)
if(res.affected == 3) then
print("insert records--- pass")
else
print("insert records---failed: expect 3 affected records, actually affected "..res.affected)
end
end
res = driver.query(conn,"select * from m1")
if res.code ~=0 then
print("select error:"..res.error)
print("select failed: "..res.error)
return
else
print("in lua, result:")
for i = 1, #(res.item) do
print("timestamp:"..res.item[i].ts)
print("speed:"..res.item[i].speed)
print("owner:"..res.item[i].owner)
end
if (#(res.item) == 3) then
print("select--- pass")
else
print("select--- failed: expect 3 affected records, actually received "..#(res.item))
end
end
res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)")
if res.code ~=0 then
print(res.error)
return
else
print("create super table--- pass")
end
res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)")
if res.code ~=0 then
print(res.error)
return
else
print("create table--- pass")
end
res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)")
if res.code ~=0 then
print(res.error)
return
else
print("insert successfully, affected:"..res.affected)
if(res.affected == 2) then
print("insert records--- pass")
else
print("insert records---failed: expect 2 affected records, actually affected "..res.affected)
end
end
res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type")
if res.code ~=0 then
print("select error:"..res.error)
print("select from super table--- failed:"..res.error)
return
else
print("in lua, result:")
print("select from super table--- pass")
for i = 1, #(res.item) do
print("res:"..res.item[i].count)
end
end
function callback(t)
print("------------------------")
print("continuous query result:")
for key, value in pairs(t) do
print("key:"..key..", value:"..value)
......@@ -97,25 +121,25 @@ end
local stream
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback)
if res.code ~=0 then
print("open stream error:"..res.error)
print("open stream--- failed:"..res.error)
return
else
print("openstream ok")
print("open stream--- pass")
stream = res.stream
end
--From now on we begin continous query in an definite (infinite if you want) loop.
print("From now on we start continous insert in an definite (infinite if you want) loop.")
local loop_index = 0
while loop_index < 10 do
while loop_index < 30 do
local t = os.time()*1000
local v = loop_index
res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v))
if res.code ~=0 then
print(res.error)
print("continous insertion--- failed:" .. res.error)
return
else
print("insert successfully, affected:"..res.affected)
--print("insert successfully, affected:"..res.affected)
end
os.execute("sleep " .. 1)
loop_index = loop_index + 1
......
......@@ -17,6 +17,7 @@ import json
import time
import random
import requests
import argparse
from requests.auth import HTTPBasicAuth
func_list=['avg','count','twa','sum','stddev','leastsquares','min',
'max','first','last','top','bottom','percentile','apercentile',
......@@ -32,19 +33,33 @@ condition_list=[
]
where_list = ['_c0>now-10d',' <50'," like \'%a%\'"]
class ConcurrentInquiry:
def __init__(self,n_Therads=25,r_Therads=25):
# def __init__(self,ts=1500000001000,host='127.0.0.1',user='root',password='taosdata',dbname='test',
# stb_prefix='st',subtb_prefix='t',n_Therads=10,r_Therads=10,probabilities=0.05,loop=5,
# stableNum = 2,subtableNum = 1000,insertRows = 100):
def __init__(self,ts,host,user,password,dbname,
stb_prefix,subtb_prefix,n_Therads,r_Therads,probabilities,loop,
stableNum ,subtableNum ,insertRows ):
self.n_numOfTherads = n_Therads
self.r_numOfTherads = r_Therads
self.ts=1500000001000
self.dbname='test'
self.ts=ts
self.host = host
self.user = user
self.password = password
self.dbname=dbname
self.stb_prefix = stb_prefix
self.subtb_prefix = subtb_prefix
self.stb_list=[]
self.subtb_list=[]
self.stb_stru_list=[]
self.subtb_stru_list=[]
self.stb_tag_list=[]
self.subtb_tag_list=[]
self.probabilities = [0.05,0.95]
self.probabilities = [probabilities,1-probabilities]
self.ifjoin = [0,1]
self.loop = loop
self.stableNum = stableNum
self.subtableNum = subtableNum
self.insertRows = insertRows
def SetThreadsNum(self,num):
self.numOfTherads=num
......@@ -88,9 +103,9 @@ class ConcurrentInquiry:
self.subtb_tag_list.append(tag)
def get_full(self): #获取所有的表、表结构
host = "127.0.0.1"
user = "root"
password = "taosdata"
host = self.host
user = self.user
password = self.password
conn = taos.connect(
host,
user,
......@@ -117,7 +132,7 @@ class ConcurrentInquiry:
return 'where '+random.choice([' and ',' or ']).join(l)
def con_interval(self,tlist,col_list,tag_list):
interval = 'interval(' + str(random.randint(0,100)) + random.choice(['a','s','d','w','n','y']) + ')'
interval = 'interval(' + str(random.randint(0,20)) + random.choice(['a','s','d','w','n','y']) + ')'
return interval
def con_limit(self,tlist,col_list,tag_list):
......@@ -133,7 +148,7 @@ class ConcurrentInquiry:
def con_group(self,tlist,col_list,tag_list):
rand_tag = random.randint(0,5)
rand_col = random.randint(0,1)
return 'group by '+','.join(random.sample(col_list,rand_col))+','.join(random.sample(tag_list,rand_tag))
return 'group by '+','.join(random.sample(col_list,rand_col) + random.sample(tag_list,rand_tag))
def con_order(self,tlist,col_list,tag_list):
return 'order by '+random.choice(tlist)
......@@ -165,8 +180,10 @@ class ConcurrentInquiry:
random.shuffle(func_list)
sel_col_list=[]
col_rand=random.randint(0,len(col_list))
loop = 0
for i,j in zip(col_list[0:col_rand],func_list): #决定每个被查询col的函数
alias = 'as '+ str(i)
alias = ' as '+ 'taos%d ' % loop
loop += 1
pick_func = ''
if j == 'leastsquares':
pick_func=j+'('+i+',1,1)'
......@@ -185,7 +202,7 @@ class ConcurrentInquiry:
for i in sel_con:
sel_con_list.append(i(tlist,col_list,tag_list)) #获取对应的条件函数
sql+=' '.join(sel_con_list) # condition
print(sql)
#print(sql)
return sql
def gen_query_join(self): #生成join查询语句
......@@ -236,8 +253,6 @@ class ConcurrentInquiry:
else:
join_section = ''.join(random.choices(col_intersection+tag_intersection))
sql += 'where t1._c0 = t2._c0 and ' + 't1.' + join_section + '=t2.' + join_section
print(sql)
return sql
def random_pick(self):
......@@ -248,16 +263,48 @@ class ConcurrentInquiry:
if x < cumulative_probability:break
return item
def gen_data(self):
stableNum = self.stableNum
subtableNum = self.subtableNum
insertRows = self.insertRows
t0 = self.ts
host = self.host
user = self.user
password = self.password
conn = taos.connect(
host,
user,
password,
)
cl = conn.cursor()
cl.execute("drop database if exists %s;" %self.dbname)
cl.execute("create database if not exists %s;" %self.dbname)
cl.execute("use %s" % self.dbname)
for k in range(stableNum):
sql="create table %s (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20)) \
tags(t1 int, t2 float, t3 bigint, t4 smallint, t5 tinyint, t6 double, t7 bool,t8 binary(20),t9 nchar(20))" % (self.stb_prefix+str(k))
cl.execute(sql)
for j in range(subtableNum):
sql = "create table %s using %s tags(%d,%d,%d,%d,%d,%d,%d,'%s','%s')" % \
(self.subtb_prefix+str(k)+'_'+str(j),self.stb_prefix+str(k),j,j/2.0,j%41,j%51,j%53,j*1.0,j%2,'taos'+str(j),'涛思'+str(j))
print(sql)
cl.execute(sql)
for i in range(insertRows):
ret = cl.execute(
"insert into %s values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s')" %
(self.subtb_prefix+str(k)+'_'+str(j),t0+i,i%100,i/2.0,i%41,i%51,i%53,i*1.0,i%2,'taos'+str(i),'涛思'+str(i)))
cl.close()
conn.close()
def rest_query(self,sql): #rest 接口
host = "127.0.0.1"
user = "root"
password = "taosdata"
host = self.host
user = self.user
password = self.password
port =6041
url = "http://{}:{}/rest/sql".format(host, port )
try:
r = requests.post(url,
data = 'use test',
data = 'use %s' % self.dbname,
auth = HTTPBasicAuth('root', 'taosdata'))
r = requests.post(url,
data = sql,
......@@ -287,20 +334,20 @@ class ConcurrentInquiry:
def query_thread_n(self,threadID): #使用原生python接口查询
host = "127.0.0.1"
user = "root"
password = "taosdata"
host = self.host
user = self.user
password = self.password
conn = taos.connect(
host,
user,
password,
)
cl = conn.cursor()
cl.execute("use test;")
cl.execute("use %s;" % self.dbname)
print("Thread %d: starting" % threadID)
while True:
loop = self.loop
while loop:
try:
if self.random_pick():
......@@ -314,33 +361,40 @@ class ConcurrentInquiry:
end = time.time()
print("time cost :",end-start)
except Exception as e:
print('-'*40)
print(
"Failure thread%d, sql: %s,exception: %s" %
"Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e)))
#exit(-1)
loop -= 1
if loop == 0: break
cl.close()
conn.close()
print("Thread %d: finishing" % threadID)
def query_thread_r(self,threadID): #使用rest接口查询
print("Thread %d: starting" % threadID)
while True:
try:
if self.random_pick():
sql=self.gen_query_sql()
else:
sql=self.gen_query_join()
print("sql is ",sql)
start = time.time()
self.rest_query(sql)
end = time.time()
print("time cost :",end-start)
except Exception as e:
print(
"Failure thread%d, sql: %s,exception: %s" %
(threadID, str(sql),str(e)))
#exit(-1)
loop = self.loop
while loop:
try:
if self.random_pick():
sql=self.gen_query_sql()
else:
sql=self.gen_query_join()
print("sql is ",sql)
start = time.time()
self.rest_query(sql)
end = time.time()
print("time cost :",end-start)
except Exception as e:
print('-'*40)
print(
"Failure thread%d, sql: %s \nexception: %s" %
(threadID, str(sql),str(e)))
#exit(-1)
loop -= 1
if loop == 0: break
print("Thread %d: finishing" % threadID)
......@@ -355,10 +409,124 @@ class ConcurrentInquiry:
thread = threading.Thread(target=self.query_thread_r, args=(i,))
threads.append(thread)
thread.start()
if len(sys.argv)>1:
q = ConcurrentInquiry(n_Therads=sys.argv[1],r_Therads=sys.argv[2])
else:
q = ConcurrentInquiry()
parser = argparse.ArgumentParser()
parser.add_argument(
'-H',
'--host-name',
action='store',
default='127.0.0.1',
type=str,
help='host name to be connected (default: 127.0.0.1)')
parser.add_argument(
'-S',
'--ts',
action='store',
default=1500000000000,
type=int,
help='insert data from timestamp (default: 1500000000000)')
parser.add_argument(
'-d',
'--db-name',
action='store',
default='test',
type=str,
help='Database name to be created (default: test)')
parser.add_argument(
'-t',
'--number-of-native-threads',
action='store',
default=10,
type=int,
help='Number of native threads (default: 10)')
parser.add_argument(
'-T',
'--number-of-rest-threads',
action='store',
default=10,
type=int,
help='Number of rest threads (default: 10)')
parser.add_argument(
'-r',
'--number-of-records',
action='store',
default=100,
type=int,
help='Number of record to be created for each table (default: 100)')
parser.add_argument(
'-c',
'--create-table',
action='store',
default='0',
type=int,
help='whether gen data (default: 0)')
parser.add_argument(
'-p',
'--subtb-name-prefix',
action='store',
default='t',
type=str,
help='subtable-name-prefix (default: t)')
parser.add_argument(
'-P',
'--stb-name-prefix',
action='store',
default='st',
type=str,
help='stable-name-prefix (default: st)')
parser.add_argument(
'-b',
'--probabilities',
action='store',
default='0.05',
type=float,
help='probabilities of join (default: 0.05)')
parser.add_argument(
'-l',
'--loop-per-thread',
action='store',
default='100',
type=int,
help='loop per thread (default: 100)')
parser.add_argument(
'-u',
'--user',
action='store',
default='root',
type=str,
help='user name')
parser.add_argument(
'-w',
'--password',
action='store',
default='root',
type=str,
help='user name')
parser.add_argument(
'-n',
'--number-of-tables',
action='store',
default=1000,
type=int,
help='Number of subtales per stable (default: 1000)')
parser.add_argument(
'-N',
'--number-of-stables',
action='store',
default=2,
type=int,
help='Number of stables (default: 2)')
args = parser.parse_args()
q = ConcurrentInquiry(
args.ts,args.host_name,args.user,args.password,args.db_name,
args.stb_name_prefix,args.subtb_name_prefix,args.number_of_native_threads,args.number_of_rest_threads,
args.probabilities,args.loop_per_thread,args.number_of_stables,args.number_of_tables ,args.number_of_records )
if args.create_table:
q.gen_data()
q.get_full()
#q.gen_query_sql()
q.run()
......@@ -102,7 +102,7 @@ class TDTestCase:
tdSql.query("select twa(c) from t3 where ts >= '2018-09-17 08:59:00.000' and ts <= '2018-09-17 09:01:30.000'")
tdSql.checkRows(1)
tdSql.checkData(-0.5)
tdSql.checkData(0, 0, -0.5)
tdSql.query("select twa(c) from t3 where ts >= '2018-09-17 08:59:00.000' and ts <= '2018-09-17 09:01:30.000' interval(1s)")
tdSql.checkRows(2)
......
......@@ -50,7 +50,7 @@ class TDTestCase:
tdSql.execute("insert into t0 values (%d, NULL)" % (self.ts))
tdDnodes.stop(1)
tdLog.sleep(10)
# tdLog.sleep(10)
tdDnodes.start(1)
tdSql.execute("use db")
tdSql.query("select * from t0")
......@@ -62,7 +62,7 @@ class TDTestCase:
tdSql.execute("create table t1 (ts timestamp, col %s)" % self.types[i])
tdSql.execute("insert into t1 values (%d, NULL)" % (self.ts))
tdDnodes.stop(1)
tdLog.sleep(10)
# tdLog.sleep(10)
tdDnodes.start(1)
tdSql.execute("use db")
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
import taos
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
import numpy as np
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
self.conn = conn
tdSql.init(conn.cursor())
self.rowNum = 10
self.ts = 1537146000000
def createOldDir(self):
path = tdDnodes.dnodes[1].getDnodeRootDir(1)
print(path)
tdLog.info("sudo mkdir -p %s/data/vnode/vnode2/wal/old" % path)
os.system("sudo mkdir -p %s/data/vnode/vnode2/wal/old" % path)
def run(self):
# os.system("rm -rf %s/ " % tdDnodes.getDnodesRootDir())
tdSql.prepare()
tdSql.execute("create table st(ts timestamp, speed int)")
tdSql.execute("insert into st values(now, 1)")
tdSql.query("select count(*) from st")
tdSql.checkRows(1)
self.createOldDir()
tdLog.sleep(10)
print("force kill taosd")
os.system("sudo kill -9 $(pgrep -x taosd)")
os.system("")
tdDnodes.start(1)
tdSql.init(self.conn.cursor())
tdSql.execute("use db")
tdSql.query("select count(*) from st")
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -38,38 +38,62 @@ class TDTestCase:
insertRows = 200
t0 = 1604298064000
sql='insert into db.t1 values '
temp=''
tdLog.info("insert %d rows" % (insertRows))
for i in range(0, insertRows):
ret = tdSql.execute(
'insert into t1 values (%d , 1)' %
(t0+i))
# ret = tdSql.execute(
# 'insert into t1 values (%d , 1)' %
# (t0+i))
temp += '(%d,1)' %(t0+i)
if i % 100 == 0 or i == (insertRows - 1 ):
print(sql+temp)
ret = tdSql.execute(
sql+temp
)
temp = ''
print("==========step2")
print("restart to commit ")
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.query("select * from db.t1")
tdSql.checkRows(insertRows)
for k in range(0,100):
tdLog.info("insert %d rows" % (insertRows))
temp=''
for i in range (0,insertRows):
ret = tdSql.execute(
'insert into db.t1 values(%d,1)' %
(t0+k*200+i)
)
temp += '(%d,1)' %(t0+k*200+i)
if i % 100 == 0 or i == (insertRows - 1 ):
print(sql+temp)
ret = tdSql.execute(
sql+temp
)
temp = ''
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.query("select * from db.t1")
tdSql.checkRows(insertRows+200*k)
print("==========step2")
print("==========step3")
print("insert into another table ")
s = 'use db'
tdSql.execute(s)
ret = tdSql.execute('create table t2 (ts timestamp, a int)')
insertRows = 20000
sql = 'insert into t2 values '
temp = ''
for i in range(0, insertRows):
ret = tdSql.execute(
'insert into t2 values (%d, 1)' %
(t0+i))
# ret = tdSql.execute(
# 'insert into t2 values (%d, 1)' %
# (t0+i))
temp += '(%d,1)' %(t0+i)
if i % 500 == 0 or i == (insertRows - 1 ):
print(sql+temp)
ret = tdSql.execute(
sql+temp
)
temp = ''
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.query("select * from t2")
......
......@@ -255,9 +255,26 @@ class TDDnode:
tdLog.exit(cmd)
self.running = 1
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index))
time.sleep(5)
if self.valgrind == 0:
time.sleep(0.1)
key = 'from offline to online'
bkey = bytes(key,encoding="utf8")
logFile = self.logDir + "/taosdlog.0"
popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
pid = popen.pid
print('Popen.pid:' + str(pid))
while True:
line = popen.stdout.readline().strip()
if bkey in line:
print(line)
popen.kill()
break
tdLog.debug("the dnode:%d has been started." % (self.index))
else:
tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index))
time.sleep(5)
# time.sleep(5)
def startWithoutSleep(self):
buildPath = self.getBuildPath()
......
......@@ -31,8 +31,8 @@ IF (TD_LINUX)
#add_executable(createTablePerformance createTablePerformance.c)
#target_link_libraries(createTablePerformance taos_static tutil common pthread)
add_executable(createNormalTable createNormalTable.c)
target_link_libraries(createNormalTable taos_static tutil common pthread)
#add_executable(createNormalTable createNormalTable.c)
#target_link_libraries(createNormalTable taos_static tutil common pthread)
#add_executable(queryPerformance queryPerformance.c)
#target_link_libraries(queryPerformance taos_static tutil common pthread)
......@@ -46,7 +46,7 @@ IF (TD_LINUX)
#add_executable(invalidTableId invalidTableId.c)
#target_link_libraries(invalidTableId taos_static tutil common pthread)
add_executable(hashIterator hashIterator.c)
target_link_libraries(hashIterator taos_static tutil common pthread)
#add_executable(hashIterator hashIterator.c)
#target_link_libraries(hashIterator taos_static tutil common pthread)
ENDIF()
......@@ -32,6 +32,7 @@ int32_t numOfThreads = 30;
int32_t numOfTables = 100000;
int32_t replica = 1;
int32_t numOfColumns = 2;
TAOS * con = NULL;
typedef struct {
int32_t tableBeginIndex;
......@@ -84,13 +85,14 @@ int main(int argc, char *argv[]) {
pthread_attr_destroy(&thattr);
free(pInfo);
taos_close(con);
}
void createDbAndSTable() {
pPrint("start to create db and stable");
char qstr[64000];
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
con = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (con == NULL) {
pError("failed to connect to DB, reason:%s", taos_errstr(con));
exit(1);
......@@ -127,8 +129,6 @@ void createDbAndSTable() {
exit(0);
}
taos_free_result(pSql);
taos_close(con);
}
void *threadFunc(void *param) {
......@@ -136,12 +136,6 @@ void *threadFunc(void *param) {
char qstr[65000];
int code;
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (con == NULL) {
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con));
exit(1);
}
sprintf(qstr, "use %s", pInfo->dbName);
TAOS_RES *pSql = taos_query(con, qstr);
taos_free_result(pSql);
......@@ -170,7 +164,6 @@ void *threadFunc(void *param) {
pInfo->createTableSpeed = speed;
pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed);
taos_close(con);
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册