提交 be90926c 编写于 作者: S Shengliang Guan

merge from develop

......@@ -224,7 +224,7 @@ tdengine
1,通过端口映射(-p),将容器内部开放的网络端口映射到宿主机的指定端口上。通过挂载本地目录(-v),可以实现宿主机与容器内部的数据同步,防止容器删除后,数据丢失。
```bash
$ docker run -d -v /etc/taos:/etc/taos -P 6041:6041 tdengine/tdengine
$ docker run -d -v /etc/taos:/etc/taos -p 6041:6041 tdengine/tdengine
526aa188da767ae94b244226a2b2eec2b5f17dd8eff592893d9ec0cd0f3a1ccd
$ curl -u root:taosdata -d 'show databases' 127.0.0.1:6041/rest/sql
......
# TDengine 集群安装、管理
多个TDengine服务器,也就是多个taosd的运行实例可以组成一个集群,以保证TDengine的高可靠运行,并提供水平扩展能力。要了解TDengine 2.0的集群管理,需要对集群的基本概念有所了解,请看《TDengine整体架构》一章。而且在安装集群之前,建议先按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章安装并体验单节点功能。
多个TDengine服务器,也就是多个taosd的运行实例可以组成一个集群,以保证TDengine的高可靠运行,并提供水平扩展能力。要了解TDengine 2.0的集群管理,需要对集群的基本概念有所了解,请看[《TDengine整体架构》](https://www.taosdata.com/cn/documentation/architecture)一章。而且在安装集群之前,建议先按照[《立即开始》](https://www.taosdata.com/cn/documentation/getting-started/)一章安装并体验单节点功能。
集群的每个数据节点是由End Point来唯一标识的,End Point是由FQDN(Fully Qualified Domain Name)外加Port组成,比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname,可通过Linux命令`hostname -f`获取(如何配置FQDN,请参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html))。端口是这个数据节点对外服务的端口号,缺省是6030,但可以通过taos.cfg里配置参数serverPort进行修改。一个物理节点可能配置了多个hostname, TDengine会自动获取第一个,但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问,可以将参数fqdn设置为本节点的IP地址。
......
......@@ -365,7 +365,7 @@ taos -C 或 taos --dump-config
- timezone
默认值:从系统中动态获取当前客户端运行系统所在的时区。
默认值:动态获取当前客户端运行系统所在的时区。
为应对多时区的数据写入和查询问题,TDengine 采用 Unix 时间戳(Unix Timestamp)来记录和存储时间戳。Unix 时间戳的特点决定了任一时刻不论在任何时区,产生的时间戳均一致。需要注意的是,Unix时间戳是在客户端完成转换和记录。为了确保客户端其他形式的时间转换为正确的 Unix 时间戳,需要设置正确的时区。
......
......@@ -67,9 +67,9 @@ TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传
CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1];
```
说明:<!-- 注意:上一行中的 SQL 语句在企业版文档中会被替换,因此修改此语句的话,需要修改企业版文档的替换字典键值!! -->
1) KEEP是该数据库的数据保留多长天数,缺省是3650天(10年),数据库会自动删除超过时限的数据;<!-- REPLACE_OPEN_TO_ENTERPRISE__KEEP_PARAM_DESCRIPTION -->
2) UPDATE 标志数据库支持更新相同时间戳数据;(从 2.1.7.0 版本开始此参数支持设为 2,表示允许部分列更新,也即更新数据行时未被设置的列会保留原值。)(从 2.0.8.0 版本开始支持此参数。注意此参数不能通过 `ALTER DATABASE` 指令进行修改。)
1) UPDATE设为0时,表示不允许更新数据,后发送的相同时间戳的数据会被直接丢弃;
......@@ -79,11 +79,11 @@ TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传
3) UPDATE设为2时,表示支持更新部分列数据,即如果更新一个数据行,其中某些列没有提供取值,那么这些列会保持原有数据行中的对应值;
4) 更多关于UPDATE参数的用法,请参考[FAQ](https://www.taosdata.com/cn/documentation/faq)。
3) 数据库名最大长度为33;
4) 一条SQL 语句的最大长度为65480个字符;
5) 数据库还有更多与存储相关的配置参数,请参见 [服务端配置](https://www.taosdata.com/cn/documentation/administrator#config) 章节。
- **显示系统当前参数**
......@@ -721,18 +721,19 @@ Query OK, 1 row(s) in set (0.001091s)
### 支持的条件过滤操作
| **Operation** | **Note** | **Applicable Data Types** |
| --------------- | ----------------------------- | ----------------------------------------- |
| > | larger than | **`timestamp`** and all numeric types |
| < | smaller than | **`timestamp`** and all numeric types |
| >= | larger than or equal to | **`timestamp`** and all numeric types |
| <= | smaller than or equal to | **`timestamp`** and all numeric types |
| = | equal to | all types |
| <> | not equal to | all types |
| is [not] null | is null or is not null | all types |
| between and | within a certain range | **`timestamp`** and all numeric types |
| in | match any value in a set | all types except first column `timestamp` |
| like | match a wildcard string | **`binary`** **`nchar`** |
| **Operation** | **Note** | **Applicable Data Types** |
| ------------- | ------------------------ | ----------------------------------------- |
| > | larger than | **`timestamp`** and all numeric types |
| < | smaller than | **`timestamp`** and all numeric types |
| >= | larger than or equal to | **`timestamp`** and all numeric types |
| <= | smaller than or equal to | **`timestamp`** and all numeric types |
| = | equal to | all types |
| <> | not equal to | all types |
| is [not] null | is null or is not null | all types |
| between and | within a certain range | **`timestamp`** and all numeric types |
| in | match any value in a set | all types except first column `timestamp` |
| like | match a wildcard string | **`binary`** **`nchar`** |
| match/nmatch | filter regex | **regex** |
1. <> 算子也可以写为 != ,请注意,这个算子不能用于数据表第一列的 timestamp 字段。
2. like 算子使用通配符字符串进行匹配检查。
......@@ -744,7 +745,30 @@ Query OK, 1 row(s) in set (0.001091s)
4. 针对单一字段的过滤,如果是时间过滤条件,则一条语句中只支持设定一个;但针对其他的(普通)列或标签列,则可以使用 `OR` 关键字进行组合条件的查询过滤。例如: `((value > 20 AND value < 30) OR (value < 12))`。
* 从 2.3.0.0 版本开始,允许使用多个时间过滤条件,但首列时间戳的过滤运算结果只能包含一个区间。
5. 从 2.0.17.0 版本开始,条件过滤开始支持 BETWEEN AND 语法,例如 `WHERE col2 BETWEEN 1.5 AND 3.25` 表示查询条件为“1.5 ≤ col2 ≤ 3.25”。
6. 从 2.1.4.0 版本开始,条件过滤开始支持 IN 算子,例如 `WHERE city IN ('Beijing', 'Shanghai')`。说明:BOOL 类型写作 `{true, false}` 或 `{0, 1}` 均可,但不能写作 0、1 之外的整数;FLOAT 和 DOUBLE 类型会受到浮点数精度影响,集合内的值在精度范围内认为和数据行的值完全相等才能匹配成功;TIMESTAMP 类型支持非主键的列。<!-- REPLACE_OPEN_TO_ENTERPRISE__IN_OPERATOR_AND_UNSIGNED_INTEGER -->
6. 从 2.1.4.0 版本开始,条件过滤开始支持 IN 算子,例如 `WHERE city IN ('Beijing', 'Shanghai')`。说明:BOOL 类型写作 `{true, false}` 或 `{0, 1}` 均可,但不能写作 0、1 之外的整数;FLOAT 和 DOUBLE 类型会受到浮点数精度影响,集合内的值在精度范围内认为和数据行的值完全相等才能匹配成功;TIMESTAMP 类型支持非主键的列。
7. 从2.3.0.0版本开始,条件过滤开始支持正则表达式,关键字match/nmatch,不区分大小写。
**语法**
WHERE (column|tbname) **match/MATCH/nmatch/NMATCH** *regex*
**正则表达式规范**
确保使用的正则表达式符合POSIX的规范,具体规范内容可参见[Regular Expressions](https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap09.html)
**使用限制**
只能针对表名(即 tbname 筛选)和标签的名称和binary类型标签值 进行正则表达式过滤,不支持针对普通列使用正则表达式过滤。
只能在 WHERE 子句中作为过滤条件存在。
正则匹配字符串长度不能超过 128 字节。可以通过参数 *maxRegexStringLen* 设置和调整最大允许的正则匹配字符串,该参数是客户端配置参数,需要重启才能生效。
**嵌套查询支持**
可以在内层查询和外层查询中使用。<!-- REPLACE_OPEN_TO_ENTERPRISE__IN_OPERATOR_AND_UNSIGNED_INTEGER -->
<a class="anchor" id="join"></a>
### JOIN 子句
......@@ -1238,10 +1262,12 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
```
- **APERCENTILE**
```mysql
SELECT APERCENTILE(field_name, P) FROM { tb_name | stb_name } [WHERE clause];
SELECT APERCENTILE(field_name, P[, algo_type])
FROM { tb_name | stb_name } [WHERE clause]
```
功能说明:统计表/超级表中列的值百分比分位数,与PERCENTILE函数相似,但是返回近似结果。
功能说明:统计表/超级表中指定列的值百分比分位数,与PERCENTILE函数相似,但是返回近似结果。
返回结果数据类型: 双精度浮点数Double。
......@@ -1249,47 +1275,62 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
适用于:**表、超级表**。
说明:*P*值取值范围0≤*P*≤100,为0的时候等同于MIN,为100的时候等同于MAX。推荐使用```APERCENTILE```函数,该函数性能远胜于```PERCENTILE```函数。
说明:*P*值有效取值范围0≤P≤100,为 0 的时候等同于 MIN,为 100 的时候等同于MAX;*algo_type*的有效输入:**default** 和 **t-digest**。 用于指定计算近似分位数的算法。可不提供第三个参数的输入,此时将使用 default 的算法进行计算,即 apercentile(column_name, 50, "default") 与 apercentile(column_name, 50) 等价。当使用“t-digest”参数的时候,将使用t-digest方式采样计算近似分位数。但该参数指定计算算法的功能从2.2.0.x版本开始支持,2.2.0.0之前的版本不支持指定使用算法的功能。
嵌套子查询支持:适用于内层查询和外层查询。
```mysql
taos> SELECT APERCENTILE(current, 20) FROM d1001;
apercentile(current, 20) |
============================
10.300000191 |
Query OK, 1 row(s) in set (0.000645s)
taos> select apercentile (count, 80, 'default') from stb1;
apercentile (c0, 80, 'default') |
==================================
601920857.210056424 |
Query OK, 1 row(s) in set (0.012363s)
taos> select apercentile (count, 80, 't-digest') from stb1;
apercentile (c0, 80, 't-digest') |
===================================
605869120.966666579 |
Query OK, 1 row(s) in set (0.011639s)
```
- **LAST_ROW**
```mysql
SELECT LAST_ROW(field_name) FROM { tb_name | stb_name };
```
功能说明:返回表/超级表的最后一条记录。
返回结果数据类型:同应用的字段。
应用字段:所有字段。
适用于:**表、超级表**。
限制:LAST_ROW() 不能与 INTERVAL 一起使用。
说明:在用于超级表时,时间戳完全一样且同为最大的数据行可能有多个,那么会从中随机返回一条,而并不保证多次运行所挑选的数据行必然一致。
功能说明:返回表/超级表的最后一条记录。
返回结果数据类型:同应用的字段。
应用字段:所有字段。
适用于:**表、超级表**。
限制:LAST_ROW() 不能与 INTERVAL 一起使用。
说明:在用于超级表时,时间戳完全一样且同为最大的数据行可能有多个,那么会从中随机返回一条,而并不保证多次运行所挑选的数据行必然一致。
示例:
```mysql
taos> SELECT LAST_ROW(current) FROM meters;
last_row(current) |
=======================
12.30000 |
Query OK, 1 row(s) in set (0.001238s)
Query OK, 1 row(s) in set (0.001238s)
taos> SELECT LAST_ROW(current) FROM d1002;
last_row(current) |
=======================
10.30000 |
Query OK, 1 row(s) in set (0.001042s)
```
```
- **INTERP**
```mysql
SELECT INTERP(field_name) FROM { tb_name | stb_name } WHERE ts='timestamp' [FILL ({ VALUE | PREV | NULL | LINEAR | NEXT})];
......
......@@ -2453,6 +2453,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg11 = "third parameter in derivative should be 0 or 1";
const char* msg12 = "parameter is out of range [1, 100]";
const char* msg13 = "parameter list required";
const char* msg14 = "third parameter algorithm must be 'default' or 't-digest'";
switch (functionId) {
case TSDB_FUNC_COUNT: {
......@@ -2797,8 +2798,16 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_PERCT:
case TSDB_FUNC_APERCT: {
// 1. valid the number of parameters
if (pItem->pNode->Expr.paramList == NULL || taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) {
/* no parameters or more than one parameter for function */
bool valid = true;
if(pItem->pNode->Expr.paramList == NULL) {
valid = false;
} else if(functionId == TSDB_FUNC_APERCT) {
size_t cnt = taosArrayGetSize(pItem->pNode->Expr.paramList);
if(cnt != 2 && cnt !=3) valid = false;
} else {
if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) valid = false;
}
if(!valid) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -2844,6 +2853,10 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SExprInfo* pExpr = NULL;
if (functionId == TSDB_FUNC_PERCT || functionId == TSDB_FUNC_APERCT) {
// param1 double
if(pVariant->nType != TSDB_DATA_TYPE_DOUBLE && pVariant->nType != TSDB_DATA_TYPE_BIGINT){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true);
double dp = GET_DOUBLE_VAL(val);
......@@ -2861,9 +2874,32 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
*/
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
colIndex += 1; // the first column is ts
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
// param2 int32
if (taosArrayGetSize(pItem->pNode->Expr.paramList) == 3) {
if (pParamElem[2].pNode != NULL) {
pVariant = &pParamElem[2].pNode->value;
// check type must string
if(pVariant->nType != TSDB_DATA_TYPE_BINARY || pVariant->pz == NULL){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13);
}
char* pzAlgo = pVariant->pz;
int32_t algo = 0;
if(strcasecmp(pzAlgo, "t-digest") == 0) {
algo = 1;
} else if(strcasecmp(pzAlgo, "default") == 0){
algo = 0;
} else {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg14);
}
// append algo int32_t
tscExprAddParams(&pExpr->base, (char*)&algo, TSDB_DATA_TYPE_INT, sizeof(int32_t));
}
}
} else if (functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE) {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
......
......@@ -130,7 +130,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_QUERY_ID TAOS_DEF_ERROR_CODE(0, 0x030C) //"Invalid query id")
#define TSDB_CODE_MND_INVALID_STREAM_ID TAOS_DEF_ERROR_CODE(0, 0x030D) //"Invalid stream id")
#define TSDB_CODE_MND_INVALID_CONN_ID TAOS_DEF_ERROR_CODE(0, 0x030E) //"Invalid connection id")
#define TSDB_CODE_MND_MNODE_IS_RUNNING TAOS_DEF_ERROR_CODE(0, 0x0310) //"mnode is alreay running")
#define TSDB_CODE_MND_MNODE_IS_RUNNING TAOS_DEF_ERROR_CODE(0, 0x0310) //"mnode is already running")
#define TSDB_CODE_MND_FAILED_TO_CONFIG_SYNC TAOS_DEF_ERROR_CODE(0, 0x0311) //"failed to config sync")
#define TSDB_CODE_MND_FAILED_TO_START_SYNC TAOS_DEF_ERROR_CODE(0, 0x0312) //"failed to start sync")
#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir")
......@@ -322,7 +322,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002) //"WAL size exceeds limit")
// http
#define TSDB_CODE_HTTP_SERVER_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x1100) //"http server is not onlin")
#define TSDB_CODE_HTTP_SERVER_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x1100) //"http server is not online")
#define TSDB_CODE_HTTP_UNSUPPORT_URL TAOS_DEF_ERROR_CODE(0, 0x1101) //"url is not support")
#define TSDB_CODE_HTTP_INVALID_URL TAOS_DEF_ERROR_CODE(0, 0x1102) //invalid url format")
#define TSDB_CODE_HTTP_NO_ENOUGH_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1103) //"no enough memory")
......
......@@ -112,6 +112,10 @@ extern "C" {
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define TOP_BOTTOM_QUERY_LIMIT 100
// apercentile(arg1,agr2,arg3) param arg3 value is below:
#define ALGO_DEFAULT 0
#define ALGO_TDIGEST 1
enum {
MASTER_SCAN = 0x0u,
REVERSE_SCAN = 0x1u,
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* include/tdigest.c
*
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
*/
#ifndef TDIGEST_H
#define TDIGEST_H
#ifndef M_PI
#define M_PI 3.14159265358979323846264338327950288 /* pi */
#endif
#define DOUBLE_MAX 1.79e+308
#define ADDITION_CENTROID_NUM 2
#define COMPRESSION 400
#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM)
#define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2))
#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression))
typedef struct SCentroid {
double mean;
int64_t weight;
}SCentroid;
typedef struct SPt {
double value;
int64_t weight;
}SPt;
typedef struct TDigest {
double compression;
int32_t threshold;
int64_t size;
int64_t total_weight;
double min;
double max;
int32_t num_buffered_pts;
SPt *buffered_pts;
int32_t num_centroids;
SCentroid *centroids;
}TDigest;
TDigest *tdigestNewFrom(void* pBuf, int32_t compression);
void tdigestAdd(TDigest *t, double x, int64_t w);
void tdigestMerge(TDigest *t1, TDigest *t2);
double tdigestQuantile(TDigest *t, double q);
void tdigestCompress(TDigest *t);
void tdigestFreeFrom(TDigest *t);
void tdigestAutoFill(TDigest* t, int32_t compression);
#endif /* TDIGEST_H */
......@@ -17,6 +17,7 @@
#include "taosdef.h"
#include "taosmsg.h"
#include "texpr.h"
#include "tdigest.h"
#include "ttype.h"
#include "tsdb.h"
#include "tglobal.h"
......@@ -145,6 +146,7 @@ typedef struct SLeastsquaresInfo {
typedef struct SAPercentileInfo {
SHistogramInfo *pHisto;
TDigest* pTDigest;
} SAPercentileInfo;
typedef struct STSCompInfo {
......@@ -337,7 +339,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
int16_t bytesHist = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
int16_t bytesDigest = (int16_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
*bytes = MAX(bytesHist, bytesDigest);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
......@@ -370,8 +374,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*interBytes =
sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
int16_t bytesHist = sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
int16_t bytesDigest = (int16_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
*interBytes = MAX(bytesHist, bytesDigest);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) {
*type = TSDB_DATA_TYPE_DOUBLE;
......@@ -2490,17 +2495,135 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
} else {
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
}
buildHistogramInfo(pInfo);
return pInfo;
}
//
// ----------------- tdigest -------------------
//
//////////////////////////////////////////////////////////////////////////////////
static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) {
return false;
}
// new TDigest
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION);
return true;
}
static void tdigest_do(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo * pAPerc = getAPerctInfo(pCtx);
assert(pAPerc->pTDigest != NULL);
if(pAPerc->pTDigest == NULL) {
qError("tdigest_do tdigest is null.");
return ;
}
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
notNullElems += 1;
double v = 0; // value
long long w = 1; // weigth
GET_TYPED_DATA(v, double, pCtx->inputType, data);
tdigestAdd(pAPerc->pTDigest, v, w);
}
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems);
}
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
}
}
static void tdigest_merge(SQLFunctionCtx *pCtx) {
SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx);
assert(pInput->pTDigest);
pInput->pTDigest = (TDigest*)((char*)pInput + sizeof(SAPercentileInfo));
tdigestAutoFill(pInput->pTDigest, COMPRESSION);
// input merge no elements , no need merge
if(pInput->pTDigest->num_centroids == 0 && pInput->pTDigest->num_buffered_pts == 0) {
return ;
}
SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
if(pOutput->pTDigest->num_centroids == 0) {
memcpy(pOutput->pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
tdigestAutoFill(pOutput->pTDigest, COMPRESSION);
} else {
tdigestMerge(pOutput->pTDigest, pInput->pTDigest);
}
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
SET_VAL(pCtx, 1, 1);
}
static void tdigest_finalizer(SQLFunctionCtx *pCtx) {
double q = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo * pAPerc = getAPerctInfo(pCtx);
if (pCtx->currentStage == MERGE_STAGE) {
if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
double res = tdigestQuantile(pAPerc->pTDigest, q/100);
memcpy(pCtx->pOutput, &res, sizeof(double));
} else {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return;
}
} else {
if (pAPerc->pTDigest->size > 0) {
double res = tdigestQuantile(pAPerc->pTDigest, q/100);
memcpy(pCtx->pOutput, &res, sizeof(double));
} else { // no need to free
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return;
}
}
pAPerc->pTDigest = NULL;
doFinalizer(pCtx);
}
//////////////////////////////////////////////////////////////////////////////////
int32_t getAlgo(SQLFunctionCtx * pCtx) {
if(pCtx->numOfParams != 2){
return ALGO_DEFAULT;
}
if(pCtx->param[1].nType != TSDB_DATA_TYPE_INT) {
return ALGO_DEFAULT;
}
return (int32_t)pCtx->param[1].i64;
}
static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
if (getAlgo(pCtx) == ALGO_TDIGEST) {
return tdigest_setup(pCtx, pResultInfo);
}
if (!function_setup(pCtx, pResultInfo)) {
return false;
}
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
buildHistogramInfo(pInfo);
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
......@@ -2508,10 +2631,16 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
}
static void apercentile_function(SQLFunctionCtx *pCtx) {
if (getAlgo(pCtx) == ALGO_TDIGEST) {
tdigest_do(pCtx);
return;
}
int32_t notNullElems = 0;
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
buildHistogramInfo(pInfo);
assert(pInfo->pHisto->elems != NULL);
......@@ -2540,6 +2669,11 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
}
static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
if (getAlgo(pCtx) == ALGO_TDIGEST) {
tdigest_merge(pCtx);
return;
}
SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx);
pInput->pHisto = (SHistogramInfo*) ((char *)pInput + sizeof(SAPercentileInfo));
......@@ -2550,6 +2684,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
}
SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
buildHistogramInfo(pOutput);
SHistogramInfo *pHisto = pOutput->pHisto;
if (pHisto->numOfElems <= 0) {
......@@ -2570,6 +2705,11 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
}
static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
if (getAlgo(pCtx) == ALGO_TDIGEST) {
tdigest_finalizer(pCtx);
return;
}
double v = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
......
......@@ -1954,7 +1954,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
// in the reverse table scan, only the following functions need to be executed
if (IS_REVERSE_SCAN(pRuntimeEnv) ||
(pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != TSDB_FUNC_STDDEV && functionId != TSDB_FUNC_PERCT)) {
(pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != TSDB_FUNC_STDDEV && functionId != TSDB_FUNC_PERCT && functionId != TSDB_FUNC_APERCT)) {
return false;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* src/tdigest.c
*
* Implementation of the t-digest data structure used to compute accurate percentiles.
*
* It is based on the MergingDigest implementation found at:
* https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java
*
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
*/
#include "os.h"
#include "osMath.h"
#include "tdigest.h"
#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0)))
//#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI)
#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q) - 1)/M_PI + (double)1/2))
#define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON)
typedef struct SMergeArgs {
TDigest *t;
SCentroid *centroids;
int32_t idx;
double weight_so_far;
double k1;
double min;
double max;
}SMergeArgs;
void tdigestAutoFill(TDigest* t, int32_t compression) {
t->centroids = (SCentroid*)((char*)t + sizeof(TDigest));
t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression));
}
TDigest *tdigestNewFrom(void* pBuf, int32_t compression) {
memset(pBuf, 0, (size_t)TDIGEST_SIZE(compression));
TDigest* t = (TDigest*)pBuf;
tdigestAutoFill(t, compression);
t->compression = compression;
t->size = (int64_t)GET_CENTROID(compression);
t->threshold = (int32_t)GET_THRESHOLD(compression);
t->min = DOUBLE_MAX;
t->max = -DOUBLE_MAX;
return t;
}
static int32_t cmpCentroid(const void *a, const void *b) {
SCentroid *c1 = (SCentroid *) a;
SCentroid *c2 = (SCentroid *) b;
if (c1->mean < c2->mean)
return -1;
if (c1->mean > c2->mean)
return 1;
return 0;
}
static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
double k2;
SCentroid *c = &args->centroids[args->idx];
args->weight_so_far += merge->weight;
k2 = INTEGRATED_LOCATION(args->t->size,
args->weight_so_far / args->t->total_weight);
//idx++
if(k2 - args->k1 > 1 && c->weight > 0) {
if(args->idx + 1 < args->t->size
&& merge->mean != args->centroids[args->idx].mean) {
args->idx++;
}
args->k1 = k2;
}
c = &args->centroids[args->idx];
if(c->mean == merge->mean) {
c->weight += merge->weight;
} else {
c->weight += merge->weight;
c->mean += (merge->mean - c->mean) * merge->weight / c->weight;
if (merge->weight > 0) {
args->min = MIN(merge->mean, args->min);
args->max = MAX(merge->mean, args->max);
}
}
}
void tdigestCompress(TDigest *t) {
SCentroid *unmerged_centroids;
int64_t unmerged_weight = 0;
int32_t num_unmerged = t->num_buffered_pts;
int32_t i, j;
SMergeArgs args;
if (t->num_buffered_pts <= 0)
return;
unmerged_centroids = (SCentroid*)malloc(sizeof(SCentroid) * t->num_buffered_pts);
for (i = 0; i < num_unmerged; i++) {
SPt *p = t->buffered_pts + i;
SCentroid *c = &unmerged_centroids[i];
c->mean = p->value;
c->weight = p->weight;
unmerged_weight += c->weight;
}
t->num_buffered_pts = 0;
t->total_weight += unmerged_weight;
qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
memset(&args, 0, sizeof(SMergeArgs));
args.centroids = (SCentroid*)malloc((size_t)(sizeof(SCentroid) * t->size));
memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));
args.t = t;
args.min = DOUBLE_MAX;
args.max = -DOUBLE_MAX;
i = 0;
j = 0;
while (i < num_unmerged && j < t->num_centroids) {
SCentroid *a = &unmerged_centroids[i];
SCentroid *b = &t->centroids[j];
if (a->mean <= b->mean) {
mergeCentroid(&args, a);
assert(args.idx < t->size);
i++;
} else {
mergeCentroid(&args, b);
assert(args.idx < t->size);
j++;
}
}
while (i < num_unmerged) {
mergeCentroid(&args, &unmerged_centroids[i++]);
assert(args.idx < t->size);
}
free((void*)unmerged_centroids);
while (j < t->num_centroids) {
mergeCentroid(&args, &t->centroids[j++]);
assert(args.idx < t->size);
}
if (t->total_weight > 0) {
t->min = MIN(t->min, args.min);
if (args.centroids[args.idx].weight <= 0) {
args.idx--;
}
t->num_centroids = args.idx + 1;
t->max = MAX(t->max, args.max);
}
memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids);
free((void*)args.centroids);
}
void tdigestAdd(TDigest* t, double x, int64_t w) {
if (w == 0)
return;
int32_t i = t->num_buffered_pts;
if(i > 0 && t->buffered_pts[i-1].value == x ) {
t->buffered_pts[i].weight = w;
} else {
t->buffered_pts[i].value = x;
t->buffered_pts[i].weight = w;
t->num_buffered_pts++;
}
if (t->num_buffered_pts >= t->threshold)
tdigestCompress(t);
}
double tdigestCDF(TDigest *t, double x) {
if (t == NULL)
return 0;
int32_t i;
double left, right;
int64_t weight_so_far;
SCentroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (x < t->min)
return 0;
if (x > t->max)
return 1;
if (t->num_centroids == 1) {
if (FLOAT_EQ(t->max, t->min))
return 0.5;
return INTERPOLATE(x, t->min, t->max);
}
weight_so_far = 0;
a = b = &tmp;
b->mean = t->min;
b->weight = 0;
right = 0;
for (i = 0; i < t->num_centroids; i++) {
SCentroid *c = &t->centroids[i];
left = b->mean - (a->mean + right);
a = b;
b = c;
right = (b->mean - a->mean) * a->weight / (a->weight + b->weight);
if (x < a->mean + right) {
double cdf = (weight_so_far
+ a->weight
* INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
return MAX(cdf, 0.0);
}
weight_so_far += a->weight;
}
left = b->mean - (a->mean + right);
a = b;
right = t->max - a->mean;
if (x < a->mean + right) {
return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
}
return 1;
}
double tdigestQuantile(TDigest *t, double q) {
if (t == NULL)
return 0;
int32_t i;
double left, right, idx;
int64_t weight_so_far;
SCentroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (t->num_centroids == 1)
return t->centroids[0].mean;
if (FLOAT_EQ(q, 0.0))
return t->min;
if (FLOAT_EQ(q, 1.0))
return t->max;
idx = q * t->total_weight;
weight_so_far = 0;
b = &tmp;
b->mean = t->min;
b->weight = 0;
right = t->min;
for (i = 0; i < t->num_centroids; i++) {
SCentroid *c = &t->centroids[i];
a = b;
left = right;
b = c;
right = (b->weight * a->mean + a->weight * b->mean)/ (a->weight + b->weight);
if (idx < weight_so_far + a->weight) {
double p = (idx - weight_so_far) / a->weight;
return left * (1 - p) + right * p;
}
weight_so_far += a->weight;
}
left = right;
a = b;
right = t->max;
if (idx < weight_so_far + a->weight) {
double p = (idx - weight_so_far) / a->weight;
return left * (1 - p) + right * p;
}
return t->max;
}
void tdigestMerge(TDigest *t1, TDigest *t2) {
// SPoints
int32_t num_pts = t2->num_buffered_pts;
for(int32_t i = num_pts - 1; i >= 0; i--) {
SPt* p = t2->buffered_pts + i;
tdigestAdd(t1, p->value, p->weight);
t2->num_buffered_pts --;
}
// centroids
for (int32_t i = 0; i < t2->num_centroids; i++) {
tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
}
}
......@@ -24,6 +24,7 @@ ENDIF()
SET_SOURCE_FILES_PROPERTIES(./astTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./histogramTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./percentileTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./apercentileTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./resultBufferTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./tsBufTest.cpp PROPERTIES COMPILE_FLAGS -w)
SET_SOURCE_FILES_PROPERTIES(./unitTest.cpp PROPERTIES COMPILE_FLAGS -w)
......
#include <gtest/gtest.h>
#include <iostream>
#include "qResultbuf.h"
#include "taos.h"
#include "taosdef.h"
#include "assert.h"
#include "qHistogram.h"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
extern "C" {
#include "tdigest.h"
#include "qHistogram.h"
}
namespace {
enum {
TEST_DATA_TYPE_INT = 0,
TEST_DATA_TYPE_BIGINT,
TEST_DATA_TYPE_FLOAT,
TEST_DATA_TYPE_DOUBLE
};
enum {
TEST_DATA_MODE_SEQ = 0,
TEST_DATA_MODE_DSEQ,
TEST_DATA_MODE_RAND_PER,
TEST_DATA_MODE_RAND_LIMIT,
};
void tdigest_init(TDigest **pTDigest) {
void *tmp = calloc(1, (size_t)(TDIGEST_SIZE(COMPRESSION)));
*pTDigest = tdigestNewFrom(tmp, COMPRESSION);
}
void thistogram_init(SHistogramInfo **pHisto) {
void *tmp = calloc(1, (int16_t)(sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo)));
*pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
}
static FORCE_INLINE int64_t testGetTimestampUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}
double * thistogram_end(SHistogramInfo* pHisto, double* ratio, int32_t num){
assert(pHisto->numOfElems > 0);
double ratio2 = *ratio * 100;
return tHistogramUniform(pHisto, &ratio2, 1);
}
void setTestData(void *data, int64_t idx, int32_t type, int64_t value) {
switch (type) {
case TEST_DATA_TYPE_INT:
*((int32_t*)data + idx) = (int32_t)value;
break;
case TEST_DATA_TYPE_BIGINT:
*((int64_t*)data + idx) = (int64_t)value;
break;
case TEST_DATA_TYPE_FLOAT:
*((float*)data + idx) = (float)value;
break;
case TEST_DATA_TYPE_DOUBLE:
*((double*)data + idx) = (double)value;
break;
default:
assert(0);
}
}
void addDTestData(void *data, int64_t idx, int32_t type, TDigest* pTDigest) {
switch (type) {
case TEST_DATA_TYPE_INT:
tdigestAdd(pTDigest, (double)*((int32_t*)data + idx), 1);
break;
case TEST_DATA_TYPE_BIGINT:
tdigestAdd(pTDigest, (double)*((int64_t*)data + idx), 1);
break;
case TEST_DATA_TYPE_FLOAT:
tdigestAdd(pTDigest, (double)*((float*)data + idx), 1);
break;
case TEST_DATA_TYPE_DOUBLE:
tdigestAdd(pTDigest, (double)*((double*)data + idx), 1);
break;
default:
assert(0);
}
}
void addHTestData(void *data, int64_t idx, int32_t type, SHistogramInfo *pHisto) {
switch (type) {
case TEST_DATA_TYPE_INT:
tHistogramAdd(&pHisto, (double)*((int32_t*)data + idx));
break;
case TEST_DATA_TYPE_BIGINT:
tHistogramAdd(&pHisto, (double)*((int64_t*)data + idx));
break;
case TEST_DATA_TYPE_FLOAT:
tHistogramAdd(&pHisto, (double)*((float*)data + idx));
break;
case TEST_DATA_TYPE_DOUBLE:
tHistogramAdd(&pHisto, (double)*((double*)data + idx));
break;
default:
assert(0);
}
}
void initTestData(void **data, int32_t type, int64_t num, int32_t mode, int32_t randPar) {
int32_t tsize[] = {4, 8, 4, 8};
*data = malloc(num * tsize[type]);
switch (mode) {
case TEST_DATA_MODE_SEQ:
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, i);
}
break;
case TEST_DATA_MODE_DSEQ:
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, num - i);
}
break;
case TEST_DATA_MODE_RAND_PER: {
srand(time(NULL));
int64_t randMax = num * randPar / 100;
if (randMax == 0) {
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, rand());
}
} else {
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, rand() % randMax);
}
}
}
break;
case TEST_DATA_MODE_RAND_LIMIT:
srand(time(NULL));
for (int64_t i = 0; i < num; ++i) {
setTestData(*data, i, type, rand() % randPar);
}
break;
default:
assert(0);
}
}
void tdigestTest() {
printf("running %s\n", __FUNCTION__);
TDigest *pTDigest = NULL;
void *data = NULL;
SHistogramInfo *pHisto = NULL;
double ratio = 0.5;
int64_t totalNum[] = {100,10000,10000000};
int32_t numTimes = sizeof(totalNum)/sizeof(totalNum[0]);
int64_t biggestNum = totalNum[numTimes - 1];
int32_t unitNum[] = {1,10,100,1000,5000,10000,100000};
int32_t unitTimes = sizeof(unitNum)/sizeof(unitNum[0]);
int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT};
int32_t modeTimes = sizeof(dataMode)/sizeof(dataMode[0]);
int32_t dataTypes[] = {TEST_DATA_TYPE_INT, TEST_DATA_TYPE_BIGINT, TEST_DATA_TYPE_FLOAT, TEST_DATA_TYPE_DOUBLE};
int32_t typeTimes = sizeof(dataTypes)/sizeof(dataTypes[0]);
int32_t randPers[] = {0, 1, 10, 50, 90};
int32_t randPTimes = sizeof(randPers)/sizeof(randPers[0]);
int32_t randLimits[] = {10, 50, 100, 1000, 10000};
int32_t randLTimes = sizeof(randLimits)/sizeof(randLimits[0]);
double useTime[2][10][10][10][10] = {0.0};
for (int32_t i = 0; i < modeTimes; ++i) {
if (dataMode[i] == TEST_DATA_MODE_RAND_PER) {
for (int32_t p = 0; p < randPTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
initTestData(&data, dataTypes[j], biggestNum, dataMode[i], randPers[p]);
for (int32_t m = 0; m < numTimes; ++m) {
int64_t startu = testGetTimestampUs();
tdigest_init(&pTDigest);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addDTestData(data, n, dataTypes[j], pTDigest);
}
double res = tdigestQuantile(pTDigest, ratio);
free(pTDigest);
useTime[0][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("DMode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[0][i][j][m][p], res);
startu = testGetTimestampUs();
thistogram_init(&pHisto);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addHTestData(data, n, dataTypes[j], pHisto);
}
double *res2 = thistogram_end(pHisto, &ratio, 1);
free(pHisto);
useTime[1][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("HMode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[1][i][j][m][p], *res2);
}
free(data);
}
}
} else if (dataMode[i] == TEST_DATA_MODE_RAND_LIMIT) {
for (int32_t p = 0; p < randLTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
initTestData(&data, dataTypes[j], biggestNum, dataMode[i], randLimits[p]);
for (int64_t m = 0; m < numTimes; ++m) {
int64_t startu = testGetTimestampUs();
tdigest_init(&pTDigest);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addDTestData(data, m, dataTypes[j], pTDigest);
}
double res = tdigestQuantile(pTDigest, ratio);
free(pTDigest);
useTime[0][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("DMode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[0][i][j][m][p], res);
startu = testGetTimestampUs();
thistogram_init(&pHisto);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addHTestData(data, n, dataTypes[j], pHisto);
}
double* res2 = thistogram_end(pHisto, &ratio, 1);
free(pHisto);
useTime[1][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("HMode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[1][i][j][m][p], *res2);
}
free(data);
}
}
} else {
for (int32_t j = 0; j < typeTimes; ++j) {
initTestData(&data, dataTypes[j], biggestNum, dataMode[i], 0);
for (int64_t m = 0; m < numTimes; ++m) {
int64_t startu = testGetTimestampUs();
tdigest_init(&pTDigest);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addDTestData(data, n, dataTypes[j], pTDigest);
}
double res = tdigestQuantile(pTDigest, ratio);
free(pTDigest);
useTime[0][i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000;
printf("DMode:%d,Type:%d,Num:%"PRId64",Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], useTime[0][i][j][m][0], res);
startu = testGetTimestampUs();
thistogram_init(&pHisto);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addHTestData(data, n, dataTypes[j], pHisto);
}
double* res2 = thistogram_end(pHisto, &ratio, 1);
free(pHisto);
useTime[1][i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000;
printf("HMode:%d,Type:%d,Num:%"PRId64",Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], useTime[1][i][j][m][0], *res2);
}
free(data);
}
}
}
printf("\n\n");
for (int32_t i = 0; i < modeTimes; ++i) {
if (dataMode[i] == TEST_DATA_MODE_RAND_PER) {
for (int32_t p = 0; p < randPTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
printf("DMode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]);
for (int32_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[0][i][j][m][p]);
}
printf("\n");
printf("HMode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]);
for (int32_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[1][i][j][m][p]);
}
printf("\n");
}
}
} else if (dataMode[i] == TEST_DATA_MODE_RAND_LIMIT) {
for (int32_t p = 0; p < randLTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
printf("DMode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[0][i][j][m][p]);
}
printf("\n");
printf("HMode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[1][i][j][m][p]);
}
printf("\n");
}
}
} else {
for (int32_t j = 0; j < typeTimes; ++j) {
printf("DMode:%d,Type:%d -", dataMode[i], dataTypes[j]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[0][i][j][m][0]);
}
printf("\n");
printf("HMode:%d,Type:%d -", dataMode[i], dataTypes[j]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[1][i][j][m][0]);
}
printf("\n");
}
}
}
}
} // namespace
TEST(testCase, apercentileTest) {
tdigestTest();
}
......@@ -313,8 +313,7 @@ python3 testNoCompress.py
python3 testMinTablesPerVnode.py
python3 queryCount.py
python3 ./test.py -f query/queryGroupbyWithInterval.py
python3 client/twoClients.py
python3 ./test.py -f query/query.py
#python3 client/twoClients.py
python3 test.py -f query/queryInterval.py
python3 test.py -f query/queryFillTest.py
# subscribe
......
###################################################################
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
......@@ -184,7 +184,11 @@ class TDSql:
if self.queryResult[row][col] != data:
if self.cursor.istype(col, "TIMESTAMP"):
# suppose user want to check nanosecond timestamp if a longer data passed
if (len(data) >= 28):
if isinstance(data, int) or isinstance(data, float):
if pd.to_datetime(self.queryResult[row][col]) == pd.to_datetime(data):
tdLog.info("sql:%s, row:%d col:%d data:%d == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data))
elif (len(data) >= 28):
if pd.to_datetime(self.queryResult[row][col]) == pd.to_datetime(data):
tdLog.info("sql:%s, row:%d col:%d data:%d == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data))
......@@ -223,6 +227,43 @@ class TDSql:
tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%d" %
(self.sql, row, col, self.queryResult[row][col], data))
def checkDeviaRation(self, row, col, data, deviation=0.001):
self.checkRowCol(row, col)
if data is None:
self.checkData(row, col, None)
return
caller = inspect.getframeinfo(inspect.stack()[1][0])
if data is not None and len(self.queryResult)==0:
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{self.sql}, data:{data}, "
f"expect result is not None but it is")
args = (
caller.filename, caller.lineno, self.sql, data, type(data),
deviation, type(deviation), self.queryResult[row][col], type(self.queryResult[row][col])
)
if not(isinstance(data,int) or isinstance(data, float)):
tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, data:{args[3]}, "
f"expect type: int or float, actual type: {args[4]}")
if not(isinstance(deviation,int) or isinstance(deviation, float)) or type(data)==type(True):
tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, deviation:{args[5]}, "
f"expect type: int or float, actual type: {args[6]}")
if not(isinstance(self.queryResult[row][col], int) or isinstance(self.queryResult[row][col], float)):
tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, result:{args[7]}, "
f"expect type: int or float, actual type: {args[8]}")
if data == 0:
devia = abs(self.queryResult[row][col])
else:
devia = abs((data - self.queryResult[row][col])/data)
if devia <= deviation:
tdLog.info(f"sql:{args[2]}, row:{row}, col:{col}, result data:{args[7]}, expect data:{args[3]}, "
f"actual deviation:{devia} <= expect deviation:{args[5]}")
else:
tdLog.exit(f"{args[0]}({args[1]}) failed: sql:{args[2]}, row:{row}, col:{col}, "
f"result data:{args[7]}, expect data:{args[3]},"
f"actual deviation:{devia} > expect deviation:{args[5]}")
pass
def getData(self, row, col):
self.checkRowCol(row, col)
return self.queryResult[row][col]
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4
system sh/cfg.sh -n dnode1 -c cache -v 1
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
sql drop database if exists cdb
sql create database if not exists cdb
sql use cdb
sql create table stb4 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9),c10 binary(16300)) TAGS(t1 int, t2 binary(10), t3 double)
sql create table tb4_0 using stb4 tags(0,'0',0.0)
sql create table tb4_1 using stb4 tags(1,'1',1.0)
sql create table tb4_2 using stb4 tags(2,'2',2.0)
sql create table tb4_3 using stb4 tags(3,'3',3.0)
sql create table tb4_4 using stb4 tags(4,'4',4.0)
$i = 0
$ts0 = 1625850000000
$blockNum = 5
$delta = 0
$tbname0 = tb4_
$a = 0
$b = 200
$c = 400
while $i < $blockNum
$x = 0
$rowNum = 200
while $x < $rowNum
$ts = $ts0 + $x
$a = $a + 1
$b = $b + 1
$c = $c + 1
$d = $x / 10
$tin = $rowNum
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
$tbname = 'tb4_ . $i
$tbname = $tbname . '
sql insert into $tbname values ( $ts , $a , $b , $c , $d , $d , $c , true, $binary , $nchar , $binary )
$x = $x + 1
endw
$i = $i + 1
$ts0 = $ts0 + 259200000
endw
sleep 100
sql connect
sql use cdb;
sql_error select apercentile(c1,101,1) from stb4 group by tbname;
sql_error select apercentile(c1,100,2) from stb4 group by tbname;
sql_error select apercentile(c1,52.111111111111,1,1) from stb4 group by tbname ;
sql select apercentile(c1,90,0) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @180.000000000@ then
return -1
endi
if $data10 != @380.000000000@ then
return -1
endi
if $data20 != @580.000000000@ then
return -1
endi
if $data30 != @780.000000000@ then
return -1
endi
if $data40 != @980.000000000@ then
return -1
endi
sql select apercentile(c1,90,1) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @180.500000000@ then
return -1
endi
if $data10 != @380.500000000@ then
return -1
endi
if $data20 != @580.500000000@ then
return -1
endi
if $data30 != @780.500000000@ then
return -1
endi
if $data40 != @980.500000000@ then
return -1
endi
sql select apercentile(c1,1,0) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @2.000000000@ then
return -1
endi
if $data10 != @202.000000000@ then
return -1
endi
if $data20 != @402.000000000@ then
return -1
endi
if $data30 != @602.000000000@ then
return -1
endi
if $data40 != @802.000000000@ then
return -1
endi
sql select apercentile(c1,1,1) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @2.500000000@ then
return -1
endi
if $data10 != @202.500000000@ then
return -1
endi
if $data20 != @402.500000000@ then
return -1
endi
if $data30 != @602.500000000@ then
return -1
endi
if $data40 != @802.500000000@ then
return -1
endi
sql select apercentile(c1,52.111111111111,0) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @104.222222222@ then
return -1
endi
if $data10 != @304.222222222@ then
return -1
endi
if $data20 != @504.222222222@ then
return -1
endi
if $data30 != @704.222222222@ then
return -1
endi
if $data40 != @904.222222222@ then
return -1
endi
sql select apercentile(c1,52.111111111111) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @104.222222222@ then
return -1
endi
if $data10 != @304.222222222@ then
return -1
endi
if $data20 != @504.222222222@ then
return -1
endi
if $data30 != @704.222222222@ then
return -1
endi
if $data40 != @904.222222222@ then
return -1
endi
sql select apercentile(c1,52.111111111111,1) from stb4 group by tbname;
if $rows != 5 then
return -1
endi
if $data00 != @104.722222222@ then
return -1
endi
if $data10 != @304.722222222@ then
return -1
endi
if $data20 != @504.722222222@ then
return -1
endi
if $data30 != @704.722222222@ then
return -1
endi
if $data40 != @904.722222222@ then
return -1
endi
sql select apercentile(c1,52.111111111111,1) from tb4_0;
if $rows != 1 then
return -1
endi
if $data00 != @104.722222222@ then
return -1
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册