未验证 提交 a75184f6 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge branch 'develop' into szhou/bug/td-13946

......@@ -23,13 +23,13 @@ TDengine 是一款高性能、分布式、支持 SQL 的时序数据库。而且
- **零管理**:安装、集群几秒搞定,无任何依赖,不用分库分表,系统运行状态监测能与 Grafana 或其他运维工具无缝集成。
- **零学习成本**:采用 SQL 查询语言,支持 Python, Java, C/C++, Go, Rust, Node.js 等多种编程语言,与 MySQL 相似,零学习成本。
- **零学习成本**:采用 SQL 查询语言,支持 Python、Java、C/C++、Go、Rust、Node.js 等多种编程语言,与 MySQL 相似,零学习成本。
- **无缝集成**:不用一行代码,即可与 Telegraf, Grafana, EMQ X, Prometheus, StatsD, collectd, Matlab, R 等第三方工具无缝集成。
- **无缝集成**:不用一行代码,即可与 Telegraf、Grafana、EMQX、Prometheus、StatsD、collectd、Matlab、R 等第三方工具无缝集成。
- **互动 Console**: 通过命令行 console,不用编程,执行 SQL 语句就能做即席查询、各种数据库的操作、管理以及集群的维护.
TDengine 可以广泛应用于物联网、工业互联网、车联网、IT 运维、能源、金融等领域, 让大量设备、数据采集器每天产生的高达 TB 甚至 PB 级的数据能得到高效实时的处理,对业务的运行状态进行实时的监测、预警,从大数据中挖掘出商业价值。
TDengine 可以广泛应用于物联网、工业互联网、车联网、IT 运维、能源、金融等领域让大量设备、数据采集器每天产生的高达 TB 甚至 PB 级的数据能得到高效实时的处理,对业务的运行状态进行实时的监测、预警,从大数据中挖掘出商业价值。
# 文档
......@@ -272,12 +272,12 @@ taos
在 TDengine 终端中,用户可以通过 SQL 命令来创建/删除数据库、表等,并进行插入查询操作。
```bash
create database demo;
use demo;
create table t (ts timestamp, speed int);
insert into t values ('2019-07-15 00:00:00', 10);
insert into t values ('2019-07-15 01:00:00', 20);
select * from t;
CREATE DATABASE demo;
USE demo;
CREATE TABLE t (ts TIMESTAMP, speed INT);
INSERT INTO t VALUES('2019-07-15 00:00:00', 10);
INSERT INTO t VALUES('2019-07-15 01:00:00', 20);
SELECT * FROM t;
ts | speed |
===================================
19-07-15 00:00:00.000| 10|
......
......@@ -21,7 +21,7 @@ TDengine is a high-performance, scalable time-series database with SQL support.
- **All in One**: TDengine has built-in caching, stream processing and data subscription functions, it is no longer necessary to integrate Kafka/Redis/HBase/Spark or other software in some scenarios. It makes the system architecture much simpler and easy to maintain.
- **Seamless Integration**: Without a single line of code, TDengine provide seamless integration with third-party tools such as Telegraf, Grafana, EMQ X, Prometheus, StatsD, collectd, etc. More will be integrated.
- **Seamless Integration**: Without a single line of code, TDengine provide seamless integration with third-party tools such as Telegraf, Grafana, EMQX, Prometheus, StatsD, collectd, etc. More will be integrated.
- **Zero Management**: Installation and cluster setup can be done in seconds. Data partitioning and sharding are executed automatically. TDengine’s running status can be monitored via Grafana or other DevOps tools.
......
......@@ -2408,7 +2408,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem, bool outerQuery, bool timeWindowQuery) {
const char* msg1 = "tag for normal table query is not allowed";
const char* msg2 = "invalid column name";
const char* msg3 = "tbname/_wstart/_wstop/_wduration in outer query does not match inner query result";
const char* msg3 = "tbname/_wstart/_wstop/_wduration/_qstart/_qstop/_qduration in outer query does not match inner query result";
const char* msg4 = "-> operate can only used in json type";
const char* msg5 = "the right value of -> operation must be string";
const char* msg6 = "select name is too long than 64, please use alias name";
......@@ -2494,7 +2494,13 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
(strncasecmp(pSchema[i].name, TSQL_TSWIN_STOP, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_TSWIN_STOP_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_TSWIN_DURATION, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_TSWIN_DURATION_COLUMN_INDEX)) {
index.columnIndex == TSDB_TSWIN_DURATION_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_QUERY_START, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_QUERY_START_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_QUERY_STOP, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_QUERY_STOP_COLUMN_INDEX) ||
(strncasecmp(pSchema[i].name, TSQL_QUERY_DURATION, tListLen(pSchema[i].name)) == 0 &&
index.columnIndex == TSDB_QUERY_DURATION_COLUMN_INDEX)) {
existed = true;
index.columnIndex = i;
break;
......@@ -2520,7 +2526,9 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
functionId = TSDB_FUNC_TAGPRJ;
colType = TSDB_COL_TAG;
} else {
if (!timeWindowQuery) {
if (!timeWindowQuery && (index.columnIndex == TSDB_TSWIN_START_COLUMN_INDEX ||
index.columnIndex == TSDB_TSWIN_STOP_COLUMN_INDEX ||
index.columnIndex == TSDB_TSWIN_DURATION_COLUMN_INDEX)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
colSchema = *tGetTimeWindowColumnSchema(index.columnIndex);
......@@ -2819,7 +2827,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_ELAPSED:
case TSDB_FUNC_MODE:
case TSDB_FUNC_STATE_COUNT:
case TSDB_FUNC_STATE_DURATION:{
case TSDB_FUNC_STATE_DURATION:
case TSDB_FUNC_HYPERLOGLOG:{
// 1. valid the number of parameters
int32_t numOfParams =
(pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList);
......@@ -2890,7 +2899,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pColumnSchema->type == TSDB_DATA_TYPE_TIMESTAMP){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29);
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && (functionId != TSDB_FUNC_MODE)) {
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) &&
(functionId != TSDB_FUNC_MODE) && (functionId != TSDB_FUNC_HYPERLOGLOG)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) &&
(functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
......@@ -3859,6 +3869,15 @@ static bool isTimeWindowToken(SStrToken* token, int16_t *columnIndex) {
} else if (tmpToken.n == strlen(TSQL_TSWIN_DURATION) && strncasecmp(TSQL_TSWIN_DURATION, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_TSWIN_DURATION_COLUMN_INDEX;
return true;
} else if (tmpToken.n == strlen(TSQL_QUERY_START) && strncasecmp(TSQL_QUERY_START, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_QUERY_START_COLUMN_INDEX;
return true;
} else if (tmpToken.n == strlen(TSQL_QUERY_STOP) && strncasecmp(TSQL_QUERY_STOP, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_QUERY_STOP_COLUMN_INDEX;
return true;
} else if (tmpToken.n == strlen(TSQL_QUERY_DURATION) && strncasecmp(TSQL_QUERY_DURATION, tmpToken.z, tmpToken.n) == 0) {
*columnIndex = TSDB_QUERY_DURATION_COLUMN_INDEX;
return true;
} else {
return false;
}
......@@ -4152,7 +4171,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
(functionId == TSDB_FUNC_HISTOGRAM) ||
(functionId == TSDB_FUNC_UNIQUE) ||
(functionId == TSDB_FUNC_MODE) ||
(functionId == TSDB_FUNC_TAIL)) {
(functionId == TSDB_FUNC_TAIL) ||
(functionId == TSDB_FUNC_HYPERLOGLOG)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -4301,6 +4321,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
int32_t scalarFuncNum = 0;
int32_t funcCompatFactor = INT_MAX;
int32_t countTbname = 0;
int32_t queryWinNum = 0;
size_t numOfExpr = tscNumOfExprs(pQueryInfo);
assert(numOfExpr > 0);
......@@ -4310,7 +4331,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
// diff function cannot be executed with other function
// arithmetic function can be executed with other arithmetic functions
size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t i = startIdx; i < size; ++i) {
SExprInfo* pExpr1 = tscExprGet(pQueryInfo, i);
......@@ -4340,6 +4361,10 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
++scalarFuncNum;
}
if (functionId == TSDB_FUNC_QSTART || functionId == TSDB_FUNC_QSTOP || functionId == TSDB_FUNC_QDURATION) {
++queryWinNum;
}
if (functionId == TSDB_FUNC_PRJ && (pExpr1->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX || TSDB_COL_IS_UD_COL(pExpr1->base.colInfo.flag))) {
continue;
}
......@@ -4371,7 +4396,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
}
}
aggNum = (int32_t)size - prjNum - scalarFuncNum - aggUdf - scalarUdf - countTbname;
aggNum = (int32_t)size - prjNum - scalarFuncNum - aggUdf - scalarUdf - countTbname - queryWinNum;
assert(aggNum >= 0);
......@@ -8275,7 +8300,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = taosArrayGetP(pQueryInfo->exprList, i);
int16_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS ||
functionId == TSDB_FUNC_SCALAR_EXPR || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_STATE_COUNT ||
......@@ -8283,8 +8308,11 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
continue;
}
if (functionId == TSDB_FUNC_WSTART || functionId == TSDB_FUNC_WSTOP || functionId == TSDB_FUNC_WDURATION) {
if (isTimeWindowFunction(functionId)) {
numOfTimeWindow++;
if (functionId >= TSDB_FUNC_QSTART && functionId <= TSDB_FUNC_QDURATION) {
continue;
}
}
if (functionId < 0) {
......
......@@ -244,10 +244,13 @@ static struct SSchema _s = {
.name = TSQL_TBNAME_L,
};
static struct SSchema _tswin[3] = {
static struct SSchema _tswin[6] = {
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_START, TSDB_TSWIN_START_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_STOP, TSDB_TSWIN_STOP_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_BIGINT, TSQL_TSWIN_DURATION, TSDB_TSWIN_DURATION_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_QUERY_START, TSDB_QUERY_START_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_TIMESTAMP, TSQL_QUERY_STOP, TSDB_QUERY_STOP_COLUMN_INDEX, LONG_BYTES},
{TSDB_DATA_TYPE_BIGINT, TSQL_QUERY_DURATION, TSDB_QUERY_DURATION_COLUMN_INDEX, LONG_BYTES},
};
SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex) {
......@@ -261,6 +264,15 @@ SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex) {
case TSDB_TSWIN_DURATION_COLUMN_INDEX: {
return &_tswin[2];
}
case TSDB_QUERY_START_COLUMN_INDEX: {
return &_tswin[3];
}
case TSDB_QUERY_STOP_COLUMN_INDEX: {
return &_tswin[4];
}
case TSDB_QUERY_DURATION_COLUMN_INDEX: {
return &_tswin[5];
}
default: {
return NULL;
}
......
......@@ -280,9 +280,12 @@ do { \
#define TSDB_TSWIN_START_COLUMN_INDEX (-2)
#define TSDB_TSWIN_STOP_COLUMN_INDEX (-3)
#define TSDB_TSWIN_DURATION_COLUMN_INDEX (-4)
#define TSDB_MIN_VALID_COLUMN_INDEX (-4)
#define TSDB_QUERY_START_COLUMN_INDEX (-5)
#define TSDB_QUERY_STOP_COLUMN_INDEX (-6)
#define TSDB_QUERY_DURATION_COLUMN_INDEX (-7)
#define TSDB_MIN_VALID_COLUMN_INDEX (-7)
#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_TSWIN_DURATION_COLUMN_INDEX)
#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_QUERY_DURATION_COLUMN_INDEX)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
......
......@@ -86,8 +86,12 @@ extern "C" {
#define TSDB_FUNC_WSTART 44
#define TSDB_FUNC_WSTOP 45
#define TSDB_FUNC_WDURATION 46
#define TSDB_FUNC_QSTART 47
#define TSDB_FUNC_QSTOP 48
#define TSDB_FUNC_QDURATION 49
#define TSDB_FUNC_HYPERLOGLOG 50
#define TSDB_FUNC_MAX_NUM 47
#define TSDB_FUNC_MAX_NUM 51
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......@@ -183,7 +187,7 @@ typedef struct SQLFunctionCtx {
uint32_t order; // asc|desc
int16_t inputType;
int32_t inputBytes;
int16_t outputType;
int32_t outputBytes; // size of results, determined by function and input column data type
int32_t interBufBytes; // internal buffer size
......@@ -211,6 +215,8 @@ typedef struct SQLFunctionCtx {
SHashObj **pUniqueSet; // for unique function
SHashObj **pModeSet; // for mode function
STimeWindow qWindow; // for _qstart/_qstop/_qduration column
int32_t allocRows; // rows allocated for output buffer
} SQLFunctionCtx;
typedef struct SAggFunctionInfo {
......@@ -235,6 +241,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
int32_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo);
int16_t getTimeWindowFunctionID(int16_t colIndex);
bool isTimeWindowFunction(int32_t functionId);
int32_t isValidFunction(const char* name, int32_t len);
bool isValidStateOper(char *oper, int32_t len);
......
......@@ -29,6 +29,7 @@
#include "queryLog.h"
#include "qUdf.h"
#include "tcompare.h"
#include "hashfunc.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
......@@ -256,11 +257,157 @@ typedef struct {
char data[];
} TailUnit;
typedef struct STailInfo {
typedef struct {
int32_t num;
TailUnit **res;
} STailInfo;
static void *getOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
/* hyperloglog start */
#define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64-HLL_BUCKET_BITS)
#define HLL_BUCKETS (1<<HLL_BUCKET_BITS)
#define HLL_BUCKET_MASK (HLL_BUCKETS-1)
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
typedef struct {
uint8_t buckets[HLL_BUCKETS]; // Data bytes.
} SHLLInfo;
static void hllBucketHisto(uint8_t *buckets, int32_t* bucketHisto) {
uint64_t *word = (uint64_t*) buckets;
uint8_t *bytes;
for (int32_t j = 0; j < HLL_BUCKETS>>3; j++) {
if (*word == 0) {
bucketHisto[0] += 8;
} else {
bytes = (uint8_t*) word;
bucketHisto[bytes[0]]++;
bucketHisto[bytes[1]]++;
bucketHisto[bytes[2]]++;
bucketHisto[bytes[3]]++;
bucketHisto[bytes[4]]++;
bucketHisto[bytes[5]]++;
bucketHisto[bytes[6]]++;
bucketHisto[bytes[7]]++;
}
word++;
}
}
static double hllTau(double x) {
if (x == 0. || x == 1.) return 0.;
double zPrime;
double y = 1.0;
double z = 1 - x;
do {
x = sqrt(x);
zPrime = z;
y *= 0.5;
z -= pow(1 - x, 2)*y;
} while(zPrime != z);
return z / 3;
}
static double hllSigma(double x) {
if (x == 1.0) return INFINITY;
double zPrime;
double y = 1;
double z = x;
do {
x *= x;
zPrime = z;
z += x * y;
y += y;
} while(zPrime != z);
return z;
}
// estimate the cardinality, the algorithm refer this paper: "New cardinality estimation algorithms for HyperLogLog sketches"
static uint64_t hllCountCnt(uint8_t *buckets) {
double m = HLL_BUCKETS;
int32_t buckethisto[64] = {0};
hllBucketHisto(buckets,buckethisto);
double z = m * hllTau((m-buckethisto[HLL_DATA_BITS+1])/(double)m);
for (int j = HLL_DATA_BITS; j >= 1; --j) {
z += buckethisto[j];
z *= 0.5;
}
z += m * hllSigma(buckethisto[0]/(double)m);
double E = llroundl(HLL_ALPHA_INF*m*m/z);
return (uint64_t) E;
}
static uint8_t hllCountNum(void *ele, int32_t elesize, int32_t *buk) {
uint64_t hash = MurmurHash3_64(ele,elesize);
int32_t index = hash & HLL_BUCKET_MASK;
hash >>= HLL_BUCKET_BITS;
hash |= ((uint64_t)1<<HLL_DATA_BITS);
uint64_t bit = 1;
uint8_t count = 1;
while((hash & bit) == 0) {
count++;
bit <<= 1;
}
*buk = index;
return count;
}
static void hll_function(SQLFunctionCtx *pCtx) {
SHLLInfo *pHLLInfo = getOutputInfo(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i) {
char *val = GET_INPUT_DATA(pCtx, i);
if (isNull(val, pCtx->inputType)) {
continue;
}
int32_t elesize = pCtx->inputBytes;
if(IS_VAR_DATA_TYPE(pCtx->inputType)) {
elesize = varDataLen(val);
val = varDataVal(val);
}
int32_t index = 0;
uint8_t count = hllCountNum(val,elesize,&index);
uint8_t oldcount = pHLLInfo->buckets[index];
if (count > oldcount) {
pHLLInfo->buckets[index] = count;
}
}
GET_RES_INFO(pCtx)->numOfRes = 1;
}
static void hll_func_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SHLLInfo *pHLLInfo = (SHLLInfo *)GET_ROWCELL_INTERBUF(pResInfo);
SHLLInfo *pData = (SHLLInfo *)GET_INPUT_DATA_LIST(pCtx);
for (int i = 0; i < HLL_BUCKETS; i++) {
if (pData->buckets[i] > pHLLInfo->buckets[i]) {
pHLLInfo->buckets[i] = pData->buckets[i];
}
}
}
static void hll_func_finalizer(SQLFunctionCtx *pCtx) {
SHLLInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
GET_RES_INFO(pCtx)->numOfRes = 1;
*(uint64_t *)(pCtx->pOutput) = hllCountCnt(pInfo->buckets);
doFinalizer(pCtx);
}
/* hyperloglog end */
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
......@@ -428,6 +575,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = (sizeof(STailInfo) + (sizeof(TailUnit) + dataBytes + POINTER_BYTES + extLength) * param);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_HYPERLOGLOG) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SHLLInfo);
*interBytes = sizeof(SHLLInfo);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = TSDB_DATA_TYPE_BINARY;
......@@ -584,11 +736,15 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// the output column may be larger than sizeof(STopBotInfo)
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_HYPERLOGLOG) {
*type = TSDB_DATA_TYPE_UBIGINT;
*bytes = sizeof(uint64_t);
*interBytes = sizeof(SHLLInfo);
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType;
*bytes = dataBytes;
size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param;
*interBytes = (int32_t)size;
*type = (int16_t)dataType;
*bytes = dataBytes;
size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param;
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = (int16_t)dataType;
*bytes = dataBytes;
......@@ -613,6 +769,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
bool isTimeWindowFunction(int32_t functionId) {
return ((functionId >= TSDB_FUNC_WSTART) && (functionId <= TSDB_FUNC_QDURATION));
}
// TODO use hash table
int32_t isValidFunction(const char* name, int32_t len) {
......@@ -2403,18 +2563,6 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
tfree(pData);
}
static void *getOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
/*
* keep the intermediate results during scan data blocks in the format of:
* +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+
......@@ -5792,29 +5940,89 @@ int16_t getTimeWindowFunctionID(int16_t colIndex) {
case TSDB_TSWIN_DURATION_COLUMN_INDEX: {
return TSDB_FUNC_WDURATION;
}
case TSDB_QUERY_START_COLUMN_INDEX: {
return TSDB_FUNC_QSTART;
}
case TSDB_QUERY_STOP_COLUMN_INDEX: {
return TSDB_FUNC_QSTOP;
}
case TSDB_QUERY_DURATION_COLUMN_INDEX: {
return TSDB_FUNC_QDURATION;
}
default:
return TSDB_FUNC_INVALID_ID;
}
}
static void wstart_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->startTs;
static void window_start_function(SQLFunctionCtx *pCtx) {
if (pCtx->functionId == TSDB_FUNC_WSTART) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->startTs;
} else { //TSDB_FUNC_QSTART
int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows
SET_VAL(pCtx, pCtx->size, size);
//INC_INIT_VAL(pCtx, size);
char *output = pCtx->pOutput;
for (int32_t i = 0; i < size; ++i) {
if (pCtx->qWindow.skey == INT64_MIN) {
*(TKEY *)output = TSDB_DATA_TIMESTAMP_NULL;
} else {
memcpy(output, &pCtx->qWindow.skey, pCtx->outputBytes);
}
output += pCtx->outputBytes;
}
}
}
static void wstop_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->endTs;
static void window_stop_function(SQLFunctionCtx *pCtx) {
if (pCtx->functionId == TSDB_FUNC_WSTOP) {
SET_VAL(pCtx, pCtx->size, 1);
*(int64_t *)(pCtx->pOutput) = pCtx->endTs;
} else { //TSDB_FUNC_QSTOP
int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows
SET_VAL(pCtx, pCtx->size, size);
//INC_INIT_VAL(pCtx, size);
char *output = pCtx->pOutput;
for (int32_t i = 0; i < size; ++i) {
if (pCtx->qWindow.ekey == INT64_MAX) {
*(TKEY *)output = TSDB_DATA_TIMESTAMP_NULL;
} else {
memcpy(output, &pCtx->qWindow.ekey, pCtx->outputBytes);
}
output += pCtx->outputBytes;
}
}
}
static void wduration_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, pCtx->size, 1);
int64_t duration = pCtx->endTs - pCtx->startTs;
if (duration < 0) {
duration = -duration;
static void window_duration_function(SQLFunctionCtx *pCtx) {
int64_t duration;
if (pCtx->functionId == TSDB_FUNC_WDURATION) {
SET_VAL(pCtx, pCtx->size, 1);
duration = pCtx->endTs - pCtx->startTs;
if (duration < 0) {
duration = -duration;
}
*(int64_t *)(pCtx->pOutput) = duration;
} else { //TSDB_FUNC_QDURATION
int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows
SET_VAL(pCtx, pCtx->size, size);
//INC_INIT_VAL(pCtx, size);
duration = pCtx->qWindow.ekey - pCtx->qWindow.skey;
if (duration < 0) {
duration = -duration;
}
char *output = pCtx->pOutput;
for (int32_t i = 0; i < size; ++i) {
if (pCtx->qWindow.skey == INT64_MIN || pCtx->qWindow.ekey == INT64_MAX) {
*(int64_t *)output = TSDB_DATA_BIGINT_NULL;
} else {
memcpy(output, &duration, pCtx->outputBytes);
}
output += pCtx->outputBytes;
}
}
*(int64_t *)(pCtx->pOutput) = duration;
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
......@@ -5827,16 +6035,16 @@ static void wduration_function(SQLFunctionCtx *pCtx) {
*
*/
int32_t functionCompatList[] = {
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last
1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
// last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp
4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
// tag, colprj, tagprj, arithm, diff, first_dist, last_dist, stddev_dst, interp rate, irate
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail
6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1,
// stateCount, stateDuration, wstart, wstop, wduration,
1, 1, 1, 1, 1,
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last
1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
// last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp
4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
// tag, colprj, tagprj, arithm, diff, first_dist, last_dist, stddev_dst, interp rate, irate
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail
6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1,
// stateCount, stateDuration, wstart, wstop, wduration, qstart, qstop, qduration, hyperloglog
1, 1, 1, 1, 1, 1, 1, 1, 1,
};
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
......@@ -6377,7 +6585,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_FUNC_WSTART,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wstart_function,
window_start_function,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -6389,7 +6597,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_FUNC_WSTOP,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wstop_function,
window_stop_function,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -6401,9 +6609,57 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
TSDB_FUNC_WDURATION,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
wduration_function,
window_duration_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 47
"_qstart",
TSDB_FUNC_QSTART,
TSDB_FUNC_QSTART,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
window_start_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 48
"_qstop",
TSDB_FUNC_QSTOP,
TSDB_FUNC_QSTOP,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
window_stop_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 49
"_qduration",
TSDB_FUNC_QDURATION,
TSDB_FUNC_QDURATION,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY,
function_setup,
window_duration_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 50
"hyperloglog",
TSDB_FUNC_HYPERLOGLOG,
TSDB_FUNC_HYPERLOGLOG,
TSDB_BASE_FUNC_SO,
function_setup,
hll_function,
hll_func_finalizer,
hll_func_merge,
dataBlockRequired,
}
};
......@@ -382,7 +382,7 @@ int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3
* the number of output result is decided by main output
*/
if (hasMainFunction && (id == TSDB_FUNC_TS || id == TSDB_FUNC_TAG || id == TSDB_FUNC_TAGPRJ ||
id == TSDB_FUNC_TS_DUMMY || id == TSDB_FUNC_TAG_DUMMY)) {
id == TSDB_FUNC_TS_DUMMY || id == TSDB_FUNC_TAG_DUMMY || isTimeWindowFunction(id))) {
continue;
}
......@@ -1905,7 +1905,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
}
static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t** rowCellInfoOffset) {
int32_t** rowCellInfoOffset, int32_t numOfRows) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx));
......@@ -1955,6 +1955,9 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->end.key = INT64_MIN;
pCtx->startTs = INT64_MIN;
pCtx->qWindow = pQueryAttr->window;
pCtx->allocRows = numOfRows;
pCtx->numOfParams = pSqlExpr->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
int16_t type = pSqlExpr->param[j].nType;
......@@ -3922,7 +3925,8 @@ static bool hasMainOutput(SQueryAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = pQueryAttr->pExpr1[i].base.functionId;
if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) {
if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TAGPRJ && !isTimeWindowFunction(functionId)) {
return true;
}
}
......@@ -5637,7 +5641,8 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo->bufCapacity = 200; // TD-10899
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
......@@ -5956,7 +5961,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
// if all pCtx is completed, then query should be over
if(allCtxCompleted(pOperator, pInfo->pCtx))
break;
break;
}
doSetOperatorCompleted(pOperator);
......@@ -7278,7 +7283,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, numOfRows);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7450,7 +7455,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, (int32_t) tableGroup);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) {
......@@ -7495,7 +7500,7 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset, pInfo->bufCapacity);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7630,7 +7635,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
return NULL;
}
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset, pRuntimeEnv->resultInfo.capacity);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7680,7 +7685,7 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset, pInfo->bufCapacity);
if (pQueryAttr->needReverseScan) {
pInfo->rangeStart = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP), false, false);
......@@ -7732,7 +7737,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
pInfo->colIndex = -1;
pInfo->reptScan = false;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pRuntimeEnv->resultInfo.capacity);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7772,7 +7778,8 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
return NULL;
}
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pRuntimeEnv->resultInfo.capacity);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7814,7 +7821,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
return NULL;
}
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset, pRuntimeEnv->resultInfo.capacity);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -7857,7 +7864,8 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
}
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset,
pRuntimeEnv->resultInfo.capacity);
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
......@@ -9168,7 +9176,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
type = s->type;
bytes = s->bytes;
} else if (TSDB_COL_IS_TSWIN_COL(pExprs[i].base.colInfo.colId) &&
(pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_WDURATION)) {
isTimeWindowFunction(pExprs[i].base.functionId)) {
SSchema* s = tGetTimeWindowColumnSchema(pExprs[i].base.colInfo.colId);
type = s->type;
bytes = s->bytes;
......@@ -9219,19 +9227,15 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
int32_t param = (int32_t)pExprs[i].base.param[0].i64;
if (pExprs[i].base.functionId > 0 &&
pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
!isTimeWindowFunction(pExprs[i].base.functionId) &&
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG;
}
// todo remove it
if (pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR &&
pExprs[i].base.functionId != TSDB_FUNC_WSTART &&
pExprs[i].base.functionId != TSDB_FUNC_WSTOP &&
pExprs[i].base.functionId != TSDB_FUNC_WDURATION &&
!isTimeWindowFunction(pExprs[i].base.functionId) &&
getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes,
&pExprs[i].base.interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) {
tfree(pExprs);
......@@ -9447,9 +9451,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) {
SSqlExpr *pSqlExprMsg = &pQueryAttr->pExpr1[k].base;
if (pSqlExprMsg->functionId == TSDB_FUNC_SCALAR_EXPR ||
pSqlExprMsg->functionId == TSDB_FUNC_WSTART ||
pSqlExprMsg->functionId == TSDB_FUNC_WSTOP ||
pSqlExprMsg->functionId == TSDB_FUNC_WDURATION) {
isTimeWindowFunction(pSqlExprMsg->functionId)) {
continue;
}
......
......@@ -33,7 +33,8 @@ typedef void (*_hash_free_fn_t)(void *param);
*/
uint32_t MurmurHash3_32(const char *key, uint32_t len);
/**
uint64_t MurmurHash3_64(const void *key, uint32_t len);
/**
*
* @param key
* @param len
......
......@@ -31,6 +31,10 @@ extern "C" {
#define TSQL_TSWIN_STOP "_wstop"
#define TSQL_TSWIN_DURATION "_wduration"
#define TSQL_QUERY_START "_qstart"
#define TSQL_QUERY_STOP "_qstop"
#define TSQL_QUERY_DURATION "_qduration"
#define TSQL_BLOCK_DIST "_BLOCK_DIST"
#define TSQL_BLOCK_DIST_L "_block_dist"
......
......@@ -78,6 +78,42 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) {
return h1;
}
uint64_t MurmurHash3_64(const void *key, uint32_t len) {
const uint64_t m = 0x87c37b91114253d5;
const int r = 47;
uint32_t seed = 0x12345678;
uint64_t h = seed ^ (len * m);
const uint8_t *data = (const uint8_t *)key;
const uint8_t *end = data + (len-(len&7));
while(data != end) {
uint64_t k = *((uint64_t*)data);
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
data += 8;
}
switch(len & 7) {
case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */
case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */
case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */
case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */
case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */
case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */
case 1: h ^= (uint64_t)data[0];
h *= m; /* fall-thru */
};
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; }
uint32_t taosIntHash_16(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint16_t *)key; }
uint32_t taosIntHash_8(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint8_t *)key; }
......
###################################################################
# Copyright (c) 2021 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def caseDescription(self):
'''
case1<ganlin zhao>: [TD-5902] [Improvement] Support rcf3339 format timestamp in tag
'''
return
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self._conn = conn
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db")
tdSql.execute('use db')
tdSql.execute('create stable stb(ts timestamp , c0 int) tags (t0 timestamp)')
#create using stb tags
tdSql.execute('create table ctb1 using stb tags("2020-02-02T02:00:00")')
tdSql.query('select t0 from ctb1');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 02:00:00")
tdSql.execute('create table ctb2 using stb tags("2020-02-02T02:00:00+0700")')
tdSql.query('select t0 from ctb2');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 03:00:00")
tdSql.execute('create table ctb3 using stb tags("2020-02-02T02:00:00+07:00")')
tdSql.query('select t0 from ctb3');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 03:00:00")
tdSql.execute('create table ctb4 using stb tags("2020-02-02T02:00:00-0800")')
tdSql.query('select t0 from ctb4');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 18:00:00")
tdSql.execute('create table ctb5 using stb tags("2020-02-02T02:00:00-08:00")')
tdSql.query('select t0 from ctb5');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 18:00:00")
tdSql.execute('create table ctb6 using stb tags("2020-02-02T02:00:00Z")')
tdSql.query('select t0 from ctb6');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 10:00:00")
#insert using stb tags
tdSql.execute('insert into ctb7 using stb tags("2020-02-02T02:00:00") values (now, 1)')
tdSql.query('select t0 from ctb7');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 02:00:00")
tdSql.execute('insert into ctb8 using stb tags("2020-02-02T02:00:00+0700") values (now, 1)')
tdSql.query('select t0 from ctb8');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 03:00:00")
tdSql.execute('insert into ctb9 using stb tags("2020-02-02T02:00:00+07:00") values (now, 1)')
tdSql.query('select t0 from ctb9');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 03:00:00")
tdSql.execute('insert into ctb10 using stb tags("2020-02-02T02:00:00-0800") values (now, 1)')
tdSql.query('select t0 from ctb10');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 18:00:00")
tdSql.execute('insert into ctb11 using stb tags("2020-02-02T02:00:00-08:00") values (now, 1)')
tdSql.query('select t0 from ctb11');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 18:00:00")
tdSql.execute('insert into ctb12 using stb tags("2020-02-02T02:00:00Z") values (now, 1)')
tdSql.query('select t0 from ctb12');
res = tdSql.getData(0, 0)
tdSql.checkEqual(str(res), "2020-02-02 10:00:00")
tdSql.execute('drop database db')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2021 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def caseDescription(self):
'''
case1<markwang>: [TD-13893] hyperloglog unique
'''
return
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self._conn = conn
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists hll")
tdSql.execute("create database if not exists hll")
tdSql.execute('use hll')
tdSql.execute('create table shll (ts timestamp, dbig bigint, dsmall smallint, dbool bool, dtiny tinyint unsigned, dfloat float, ddouble double, dnchar nchar(4093), dbinary binary(64), dtime timestamp) tags (tbinary nchar(4093), tint int)')
tdSql.execute('create table hll1 using shll tags ("t1", 1)')
tdSql.execute('create table hll2 using shll tags ("t2", 2)')
tdSql.execute('insert into hll1 values("2021-10-17 00:31:31", 1, -3276, true, 253, 3.32333, 4.984392323, "你好", "sddd", 333) ("2022-01-24 00:31:32", 1, -32767, false, 254, NULL, 4.982392323, "你好吗", "sdf",2323)')
tdSql.execute('insert into hll2 values("2021-10-15 00:31:33", 1, NULL, true, 23, 3.4, 4.982392323, "你好吗", "sdf", 333) ("2021-12-24 00:31:34", 2, 32767, NULL, NULL, NULL, 4.982392323, NULL, "sddd", NULL) ("2022-01-01 08:00:05", 19, 3276, true, 2, 3.323222, 4.92323, "试试", "sddd", 1645434434000)')
tdSql.execute('insert into hll2 values("2021-10-17 00:31:31", NULL, 32767, true, 123, 3.323232333, 4.2, NULL, NULL, NULL) ("2022-01-01 08:00:06", NULL, NULL, NULL, 35, 3.323232333, NULL, "试试", NULL, 1645434434000) ("2022-01-01 08:00:07", 9, 54, true, 25, 3.32333, NULL, "试试", NULL, 1645434434001)')
## test normal table
tdSql.query('select hyperloglog(ts) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 6)
tdSql.query('select hyperloglog(dbig) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 4)
tdSql.query('select hyperloglog(dsmall) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 3)
tdSql.query('select hyperloglog(dbool) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.query('select hyperloglog(dtiny) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 5)
tdSql.query('select hyperloglog(dfloat) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 4)
tdSql.query('select hyperloglog(ddouble) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 3)
tdSql.query('select hyperloglog(dnchar) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
tdSql.query('select hyperloglog(dbinary) from hll2')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
## test super table
tdSql.query('select hyperloglog(dnchar) from shll')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 3)
# test group by
#group by column
tdSql.query('select hyperloglog(dnchar) from shll group by dnchar')
tdSql.checkRows(4)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, 1)
tdSql.checkData(3, 0, 1)
tdSql.query('select hyperloglog(dsmall) from shll group by dnchar')
tdSql.checkRows(4)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, 1)
tdSql.checkData(3, 0, 2)
tdSql.query('select hyperloglog(dsmall) from hll2 group by dnchar')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 0)
tdSql.checkData(2, 0, 2)
#group by tbname
tdSql.query('select hyperloglog(dsmall) from shll group by tbname')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 3)
#group by tag
tdSql.query('select hyperloglog(dnchar) from shll group by tint')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 2)
#test order by
#order by column [desc]
tdSql.query('select hyperloglog(dnchar) from shll group by dnchar order by dnchar desc')
tdSql.checkRows(4)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, 1)
tdSql.checkData(3, 0, 0)
#order by tag
tdSql.query('select hyperloglog(dsmall) from shll group by tint order by tint desc')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 2)
# error
tdSql.error("select hyperloglog(ts,1) from shll")
#interval
tdSql.query('select hyperloglog(dnchar) from shll interval(1s)')
tdSql.checkRows(7)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 1)
tdSql.checkData(2, 0, "2021-12-24 00:31:34")
tdSql.checkData(2, 1, 0)
tdSql.checkData(3, 1, 1)
tdSql.checkData(4, 1, 1)
tdSql.checkData(5, 1, 1)
tdSql.checkData(6, 1, 1)
tdSql.query('select hyperloglog(dnchar) from shll interval(1w)')
tdSql.checkRows(4)
tdSql.checkData(0, 1, 2)
tdSql.checkData(1, 1, 0)
tdSql.checkData(2, 1, 1)
tdSql.checkData(3, 1, 1)
#state_window
tdSql.query('select hyperloglog(dnchar) from hll2 state_window(dsmall)')
tdSql.checkRows(5)
#session
tdSql.query('select hyperloglog(dbinary) from hll2 session(ts,2w)')
tdSql.checkRows(2)
tdSql.checkData(0, 0, "2021-10-15 00:31:33")
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 0, "2021-12-24 00:31:34")
tdSql.checkData(1, 1, 1)
#where
tdSql.query('select hyperloglog(dbinary) from shll where dnchar="试试"')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.query('select hyperloglog(dbinary) from shll where ts <= "2022-01-01 08:00:05"')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
#slimit/soffset
tdSql.query('select hyperloglog(dsmall) from shll group by dnchar slimit 2 soffset 2')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 2)
#limit/offset
tdSql.query('select hyperloglog(dnchar) from shll interval(1s) limit 1,3')
tdSql.checkRows(3)
tdSql.checkData(0, 0, "2021-10-17 00:31:31")
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 0)
tdSql.checkData(2, 1, 1)
#having
tdSql.query('select hyperloglog(dsmall) from shll group by dnchar having hyperloglog(dsmall)>1')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
#subquery
tdSql.query('select hyperloglog(dbinary) from (select dbinary from shll where dnchar = "试试")')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
#union
tdSql.query('select hyperloglog(dtiny) from hll1 union all select hyperloglog(dtiny) from hll2')
tdSql.checkRows(2)
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 5)
#join
tdSql.execute('create table shll1 (ts timestamp, dbig bigint, dsmall smallint, dbool bool, dtiny tinyint unsigned, dfloat float, ddouble double, dnchar nchar(4093), dbinary binary(64), dtime timestamp) tags (tbinary nchar(4093), tint int)')
tdSql.execute('create table hll11 using shll1 tags ("t1", 1)')
tdSql.execute('insert into hll11 values("2021-10-17 00:31:31", 1, -3276, true, 253, 3.32333, 4.984392323, "你好", "sddd", 333) ("2022-01-24 00:31:32", 1, -32767, false, 254, NULL, 4.982392323, "你好吗", "sdf",2323)')
tdSql.query('select hyperloglog(shll1.ddouble) from shll, shll1 where shll.ts=shll1.ts and shll.tint=shll1.tint')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
此差异已折叠。
......@@ -714,6 +714,7 @@
5,,develop-test,python3 ./test.py -f 2-query/function_to_unixtimestamp.py
5,,develop-test,python3 ./test.py -f 2-query/time_window_keywords.py
5,,develop-test,python3 ./test.py -f 2-query/TD-13946.py
5,,develop-test,python3 ./test.py -f 2-query/query_window_keywords.py
4,,system-test,python3 test.py -f 4-taosAdapter/TD-12163.py
4,,system-test,python3 ./test.py -f 3-connectors/restful/restful_binddbname.py
4,,system-test,python3 ./test.py -f 2-query/TD-12614.py
......@@ -789,6 +790,7 @@
3,,pytest,python3 test.py -f tag_lite/binary.py
3,,pytest,python3 test.py -f query/filterAllIntTypes.py
3,,develop-test,python3 ./test.py -f 2-query/ts_hidden_column.py
3,,develop-test,python3 ./test.py -f 2-query/TD-5902.py
3,,script,eval sh -c \"if [ `uname -m` != aarch64 ]; then ./test.sh -f general/compute/scalar_triangle.sim; fi\"
3,,script,./test.sh -f general/compute/scalar_str_concat_len.sim
3,,develop-test,python3 ./test.py -f 2-query/function_tail.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册