Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
31c77715
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
31c77715
编写于
2月 21, 2022
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add mode function code
上级
31358b0f
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
212 addition
and
119 deletion
+212
-119
src/client/src/tscGlobalmerge.c
src/client/src/tscGlobalmerge.c
+3
-5
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+6
-3
src/inc/taoserror.h
src/inc/taoserror.h
+1
-1
src/query/inc/qAggMain.h
src/query/inc/qAggMain.h
+4
-2
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+1
-0
src/query/inc/qResultbuf.h
src/query/inc/qResultbuf.h
+2
-0
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+182
-97
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+12
-10
src/util/src/terror.c
src/util/src/terror.c
+1
-1
未找到文件。
src/client/src/tscGlobalmerge.c
浏览文件 @
31c77715
...
@@ -615,11 +615,9 @@ static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_
...
@@ -615,11 +615,9 @@ static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_
aAggs
[
functionId
].
mergeFunc
(
&
pCtx
[
j
]);
aAggs
[
functionId
].
mergeFunc
(
&
pCtx
[
j
]);
}
}
if
(
functionId
==
TSDB_FUNC_UNIQUE
&&
if
(
GET_RES_INFO
(
&
(
pCtx
[
j
]))
->
numOfRes
==
-
1
){
(
GET_RES_INFO
(
&
(
pCtx
[
j
]))
->
numOfRes
>
MAX_UNIQUE_RESULT_ROWS
||
GET_RES_INFO
(
&
(
pCtx
[
j
]))
->
numOfRes
==
-
1
)){
tscError
(
"result num is too large."
);
tscError
(
"Unique result num is too large. num: %d, limit: %d"
,
longjmp
(
pInfo
->
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_RESULT_TOO_LARGE
);
GET_RES_INFO
(
&
(
pCtx
[
j
]))
->
numOfRes
,
MAX_UNIQUE_RESULT_ROWS
);
longjmp
(
pInfo
->
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE
);
}
}
}
}
}
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
31c77715
...
@@ -2693,7 +2693,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
...
@@ -2693,7 +2693,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const
char
*
msg26
=
"start param cannot be 0 with 'log_bin'"
;
const
char
*
msg26
=
"start param cannot be 0 with 'log_bin'"
;
const
char
*
msg27
=
"factor param cannot be negative or equal to 0/1"
;
const
char
*
msg27
=
"factor param cannot be negative or equal to 0/1"
;
const
char
*
msg28
=
"the second paramter of diff should be 0 or 1"
;
const
char
*
msg28
=
"the second paramter of diff should be 0 or 1"
;
const
char
*
msg29
=
"key timestamp column cannot be used to unique function"
;
const
char
*
msg29
=
"key timestamp column cannot be used to unique
/mode
function"
;
switch
(
functionId
)
{
switch
(
functionId
)
{
case
TSDB_FUNC_COUNT
:
{
case
TSDB_FUNC_COUNT
:
{
...
@@ -2791,7 +2791,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
...
@@ -2791,7 +2791,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case
TSDB_FUNC_CSUM
:
case
TSDB_FUNC_CSUM
:
case
TSDB_FUNC_STDDEV
:
case
TSDB_FUNC_STDDEV
:
case
TSDB_FUNC_LEASTSQR
:
case
TSDB_FUNC_LEASTSQR
:
case
TSDB_FUNC_ELAPSED
:
{
case
TSDB_FUNC_ELAPSED
:
case
TSDB_FUNC_MODE
:
{
// 1. valid the number of parameters
// 1. valid the number of parameters
int32_t
numOfParams
=
int32_t
numOfParams
=
(
pItem
->
pNode
->
Expr
.
paramList
==
NULL
)
?
0
:
(
int32_t
)
taosArrayGetSize
(
pItem
->
pNode
->
Expr
.
paramList
);
(
pItem
->
pNode
->
Expr
.
paramList
==
NULL
)
?
0
:
(
int32_t
)
taosArrayGetSize
(
pItem
->
pNode
->
Expr
.
paramList
);
...
@@ -2852,7 +2853,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
...
@@ -2852,7 +2853,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// 2. check if sql function can be applied on this column data type
// 2. check if sql function can be applied on this column data type
SSchema
*
pSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
index
.
columnIndex
);
SSchema
*
pSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
index
.
columnIndex
);
if
(
!
IS_NUMERIC_TYPE
(
pSchema
->
type
)
&&
(
functionId
!=
TSDB_FUNC_ELAPSED
))
{
if
(
functionId
==
TSDB_FUNC_MODE
&&
pColumnSchema
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
){
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg29
);
}
else
if
(
!
IS_NUMERIC_TYPE
(
pSchema
->
type
)
&&
(
functionId
!=
TSDB_FUNC_ELAPSED
))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
pSchema
->
type
)
&&
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
pSchema
->
type
)
&&
(
functionId
==
TSDB_FUNC_DIFF
||
functionId
==
TSDB_FUNC_DERIVATIVE
))
{
(
functionId
==
TSDB_FUNC_DIFF
||
functionId
==
TSDB_FUNC_DERIVATIVE
))
{
...
...
src/inc/taoserror.h
浏览文件 @
31c77715
...
@@ -293,7 +293,7 @@ int32_t* taosGetErrno();
...
@@ -293,7 +293,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version")
#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version")
#define TSDB_CODE_QRY_
UNIQUE_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"unique
result num is too large")
#define TSDB_CODE_QRY_
RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"
result num is too large")
// grant
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
...
...
src/query/inc/qAggMain.h
浏览文件 @
31c77715
...
@@ -79,8 +79,9 @@ extern "C" {
...
@@ -79,8 +79,9 @@ extern "C" {
#define TSDB_FUNC_ELAPSED 37
#define TSDB_FUNC_ELAPSED 37
#define TSDB_FUNC_HISTOGRAM 38
#define TSDB_FUNC_HISTOGRAM 38
#define TSDB_FUNC_UNIQUE 39
#define TSDB_FUNC_UNIQUE 39
#define TSDB_FUNC_MODE 40
#define TSDB_FUNC_MAX_NUM 4
0
#define TSDB_FUNC_MAX_NUM 4
1
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
...
@@ -148,7 +149,7 @@ typedef struct SResultRowCellInfo {
...
@@ -148,7 +149,7 @@ typedef struct SResultRowCellInfo {
int8_t
hasResult
;
// result generated, not NULL value
int8_t
hasResult
;
// result generated, not NULL value
bool
initialized
;
// output buffer has been initialized
bool
initialized
;
// output buffer has been initialized
bool
complete
;
// query has completed
bool
complete
;
// query has completed
uint32_t
numOfRes
;
// num of output result in current buffer
int32_t
numOfRes
;
// num of output result in current buffer
}
SResultRowCellInfo
;
}
SResultRowCellInfo
;
typedef
struct
SPoint1
{
typedef
struct
SPoint1
{
...
@@ -203,6 +204,7 @@ typedef struct SQLFunctionCtx {
...
@@ -203,6 +204,7 @@ typedef struct SQLFunctionCtx {
SPoint1
end
;
SPoint1
end
;
SHashObj
**
pUniqueSet
;
// for unique function
SHashObj
**
pUniqueSet
;
// for unique function
SHashObj
**
pModeSet
;
// for mode function
}
SQLFunctionCtx
;
}
SQLFunctionCtx
;
typedef
struct
SAggFunctionInfo
{
typedef
struct
SAggFunctionInfo
{
...
...
src/query/inc/qExecutor.h
浏览文件 @
31c77715
...
@@ -91,6 +91,7 @@ typedef struct SResultRow {
...
@@ -91,6 +91,7 @@ typedef struct SResultRow {
STimeWindow
win
;
STimeWindow
win
;
char
*
key
;
// start key of current result row
char
*
key
;
// start key of current result row
SHashObj
*
uniqueHash
;
// for unique function
SHashObj
*
uniqueHash
;
// for unique function
SHashObj
*
modeHash
;
// for unique function
}
SResultRow
;
}
SResultRow
;
typedef
struct
SResultRowCell
{
typedef
struct
SResultRowCell
{
...
...
src/query/inc/qResultbuf.h
浏览文件 @
31c77715
...
@@ -80,6 +80,8 @@ typedef struct SDiskbasedResultBuf {
...
@@ -80,6 +80,8 @@ typedef struct SDiskbasedResultBuf {
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
#define MAX_UNIQUE_RESULT_ROWS (1000)
#define MAX_UNIQUE_RESULT_ROWS (1000)
#define MAX_UNIQUE_RESULT_SIZE (1024*1024*1)
#define MAX_UNIQUE_RESULT_SIZE (1024*1024*1)
#define MAX_MODE_INNER_RESULT_ROWS (1000000)
#define MAX_MODE_INNER_RESULT_SIZE (1024*1024*10)
/**
/**
* create disk-based result buffer
* create disk-based result buffer
* @param pResultBuf
* @param pResultBuf
...
...
src/query/src/qAggMain.c
浏览文件 @
31c77715
...
@@ -233,6 +233,16 @@ typedef struct {
...
@@ -233,6 +233,16 @@ typedef struct {
char
res
[];
char
res
[];
}
SUniqueFuncInfo
;
}
SUniqueFuncInfo
;
typedef
struct
{
int64_t
count
;
char
data
[];
}
ModeUnit
;
typedef
struct
{
int32_t
num
;
char
res
[];
}
SModeFuncInfo
;
int32_t
getResultDataInfo
(
int32_t
dataType
,
int32_t
dataBytes
,
int32_t
functionId
,
int32_t
param
,
int16_t
*
type
,
int32_t
getResultDataInfo
(
int32_t
dataType
,
int32_t
dataBytes
,
int32_t
functionId
,
int32_t
param
,
int16_t
*
type
,
int32_t
*
bytes
,
int32_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
,
SUdfInfo
*
pUdfInfo
)
{
int32_t
*
bytes
,
int32_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
,
SUdfInfo
*
pUdfInfo
)
{
if
(
!
isValidDataType
(
dataType
))
{
if
(
!
isValidDataType
(
dataType
))
{
...
@@ -369,13 +379,25 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
...
@@ -369,13 +379,25 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
int64_t
size
=
sizeof
(
UniqueUnit
)
+
dataBytes
+
extLength
;
int64_t
size
=
sizeof
(
UniqueUnit
)
+
dataBytes
+
extLength
;
size
*=
param
;
size
*=
param
;
size
+=
sizeof
(
SUniqueFuncInfo
);
size
+=
sizeof
(
SUniqueFuncInfo
);
if
(
size
>
MAX_UNIQUE_RESULT_SIZE
){
if
(
size
>
MAX_UNIQUE_RESULT_SIZE
)
{
size
=
MAX_UNIQUE_RESULT_SIZE
;
size
=
MAX_UNIQUE_RESULT_SIZE
;
}
}
*
bytes
=
size
;
*
bytes
=
size
;
*
interBytes
=
*
bytes
;
*
interBytes
=
*
bytes
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
functionId
==
TSDB_FUNC_MODE
)
{
*
type
=
TSDB_DATA_TYPE_BINARY
;
int64_t
size
=
sizeof
(
ModeUnit
)
+
dataBytes
;
size
*=
MAX_MODE_INNER_RESULT_ROWS
;
size
+=
sizeof
(
SModeFuncInfo
);
if
(
size
>
MAX_MODE_INNER_RESULT_SIZE
){
size
=
MAX_MODE_INNER_RESULT_SIZE
;
}
*
bytes
=
size
;
*
interBytes
=
*
bytes
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
functionId
==
TSDB_FUNC_SAMPLE
)
{
}
else
if
(
functionId
==
TSDB_FUNC_SAMPLE
)
{
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
bytes
=
(
sizeof
(
SSampleFuncInfo
)
+
dataBytes
*
param
+
sizeof
(
int64_t
)
*
param
+
extLength
*
param
);
*
bytes
=
(
sizeof
(
SSampleFuncInfo
)
+
dataBytes
*
param
+
sizeof
(
int64_t
)
*
param
+
extLength
*
param
);
...
@@ -513,7 +535,18 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
...
@@ -513,7 +535,18 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
size
=
MAX_UNIQUE_RESULT_SIZE
;
size
=
MAX_UNIQUE_RESULT_SIZE
;
}
}
*
interBytes
=
(
int32_t
)
size
;
*
interBytes
=
(
int32_t
)
size
;
}
else
if
(
functionId
==
TSDB_FUNC_SAMPLE
)
{
}
else
if
(
functionId
==
TSDB_FUNC_MODE
)
{
*
type
=
(
int16_t
)
dataType
;
*
bytes
=
dataBytes
;
int64_t
size
=
sizeof
(
ModeUnit
)
+
dataBytes
;
size
*=
MAX_MODE_INNER_RESULT_ROWS
;
size
+=
sizeof
(
SModeFuncInfo
);
if
(
size
>
MAX_MODE_INNER_RESULT_SIZE
){
size
=
MAX_MODE_INNER_RESULT_SIZE
;
}
*
interBytes
=
size
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
functionId
==
TSDB_FUNC_SAMPLE
)
{
*
type
=
(
int16_t
)
dataType
;
*
type
=
(
int16_t
)
dataType
;
*
bytes
=
dataBytes
;
*
bytes
=
dataBytes
;
size_t
size
=
sizeof
(
SSampleFuncInfo
)
+
dataBytes
*
param
+
sizeof
(
int64_t
)
*
param
+
extLength
*
param
;
size_t
size
=
sizeof
(
SSampleFuncInfo
)
+
dataBytes
*
param
+
sizeof
(
int64_t
)
*
param
+
extLength
*
param
;
...
@@ -2245,20 +2278,12 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
...
@@ -2245,20 +2278,12 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
tfree
(
pData
);
tfree
(
pData
);
}
}
/*
static
void
*
getOutputInfo
(
SQLFunctionCtx
*
pCtx
)
{
* Parameters values:
* 1. param[0]: maximum allowable results
* 2. param[1]: order by type (time or value)
* 3. param[2]: asc/desc order
*
* top/bottom use the intermediate result buffer to keep the intermediate result
*/
static
STopBotInfo
*
getTopBotOutputInfo
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// only the first_stage_merge is directly written data into final output buffer
// only the first_stage_merge is directly written data into final output buffer
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
return
(
STopBotInfo
*
)
pCtx
->
pOutput
;
return
pCtx
->
pOutput
;
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate buffer
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return
GET_ROWCELL_INTERBUF
(
pResInfo
);
return
GET_ROWCELL_INTERBUF
(
pResInfo
);
}
}
...
@@ -2291,7 +2316,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
...
@@ -2291,7 +2316,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
return
true
;
return
true
;
}
}
STopBotInfo
*
pTopBotInfo
=
get
TopBot
OutputInfo
(
pCtx
);
STopBotInfo
*
pTopBotInfo
=
getOutputInfo
(
pCtx
);
// required number of results are not reached, continue load data block
// required number of results are not reached, continue load data block
if
(
pTopBotInfo
->
num
<
pCtx
->
param
[
0
].
i64
)
{
if
(
pTopBotInfo
->
num
<
pCtx
->
param
[
0
].
i64
)
{
...
@@ -2346,7 +2371,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
...
@@ -2346,7 +2371,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
return
false
;
return
false
;
}
}
STopBotInfo
*
pInfo
=
get
TopBot
OutputInfo
(
pCtx
);
STopBotInfo
*
pInfo
=
getOutputInfo
(
pCtx
);
buildTopBotStruct
(
pInfo
,
pCtx
);
buildTopBotStruct
(
pInfo
,
pCtx
);
return
true
;
return
true
;
}
}
...
@@ -2354,7 +2379,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
...
@@ -2354,7 +2379,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
static
void
top_function
(
SQLFunctionCtx
*
pCtx
)
{
static
void
top_function
(
SQLFunctionCtx
*
pCtx
)
{
int32_t
notNullElems
=
0
;
int32_t
notNullElems
=
0
;
STopBotInfo
*
pRes
=
get
TopBot
OutputInfo
(
pCtx
);
STopBotInfo
*
pRes
=
getOutputInfo
(
pCtx
);
assert
(
pRes
->
num
>=
0
);
assert
(
pRes
->
num
>=
0
);
if
((
void
*
)
pRes
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pRes
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
))
{
if
((
void
*
)
pRes
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pRes
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
))
{
...
@@ -2393,7 +2418,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
...
@@ -2393,7 +2418,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
// construct the input data struct from binary data
// construct the input data struct from binary data
buildTopBotStruct
(
pInput
,
pCtx
);
buildTopBotStruct
(
pInput
,
pCtx
);
STopBotInfo
*
pOutput
=
get
TopBot
OutputInfo
(
pCtx
);
STopBotInfo
*
pOutput
=
getOutputInfo
(
pCtx
);
// the intermediate result is binary, we only use the output data type
// the intermediate result is binary, we only use the output data type
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
...
@@ -2413,7 +2438,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
...
@@ -2413,7 +2438,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
static
void
bottom_function
(
SQLFunctionCtx
*
pCtx
)
{
static
void
bottom_function
(
SQLFunctionCtx
*
pCtx
)
{
int32_t
notNullElems
=
0
;
int32_t
notNullElems
=
0
;
STopBotInfo
*
pRes
=
get
TopBot
OutputInfo
(
pCtx
);
STopBotInfo
*
pRes
=
getOutputInfo
(
pCtx
);
if
((
void
*
)
pRes
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pRes
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
))
{
if
((
void
*
)
pRes
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pRes
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
))
{
buildTopBotStruct
(
pRes
,
pCtx
);
buildTopBotStruct
(
pRes
,
pCtx
);
...
@@ -2450,7 +2475,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) {
...
@@ -2450,7 +2475,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) {
// construct the input data struct from binary data
// construct the input data struct from binary data
buildTopBotStruct
(
pInput
,
pCtx
);
buildTopBotStruct
(
pInput
,
pCtx
);
STopBotInfo
*
pOutput
=
get
TopBot
OutputInfo
(
pCtx
);
STopBotInfo
*
pOutput
=
getOutputInfo
(
pCtx
);
// the intermediate result is binary, we only use the output data type
// the intermediate result is binary, we only use the output data type
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
...
@@ -2619,18 +2644,6 @@ static void buildHistogramInfo(SAPercentileInfo* pInfo) {
...
@@ -2619,18 +2644,6 @@ static void buildHistogramInfo(SAPercentileInfo* pInfo) {
pInfo
->
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pInfo
->
pHisto
+
sizeof
(
SHistogramInfo
));
pInfo
->
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pInfo
->
pHisto
+
sizeof
(
SHistogramInfo
));
}
}
static
SAPercentileInfo
*
getAPerctInfo
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pInfo
=
NULL
;
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
pInfo
=
(
SAPercentileInfo
*
)
pCtx
->
pOutput
;
}
else
{
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
}
return
pInfo
;
}
//
//
// ----------------- tdigest -------------------
// ----------------- tdigest -------------------
//
//
...
@@ -2642,7 +2655,7 @@ static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo)
...
@@ -2642,7 +2655,7 @@ static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo)
}
}
// new TDigest
// new TDigest
SAPercentileInfo
*
pInfo
=
get
APerc
tInfo
(
pCtx
);
SAPercentileInfo
*
pInfo
=
get
Outpu
tInfo
(
pCtx
);
char
*
tmp
=
(
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
);
char
*
tmp
=
(
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
);
pInfo
->
pTDigest
=
tdigestNewFrom
(
tmp
,
COMPRESSION
);
pInfo
->
pTDigest
=
tdigestNewFrom
(
tmp
,
COMPRESSION
);
return
true
;
return
true
;
...
@@ -2652,7 +2665,7 @@ static void tdigest_do(SQLFunctionCtx *pCtx) {
...
@@ -2652,7 +2665,7 @@ static void tdigest_do(SQLFunctionCtx *pCtx) {
int32_t
notNullElems
=
0
;
int32_t
notNullElems
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pAPerc
=
get
APerc
tInfo
(
pCtx
);
SAPercentileInfo
*
pAPerc
=
get
Outpu
tInfo
(
pCtx
);
assert
(
pAPerc
->
pTDigest
!=
NULL
);
assert
(
pAPerc
->
pTDigest
!=
NULL
);
if
(
pAPerc
->
pTDigest
==
NULL
)
{
if
(
pAPerc
->
pTDigest
==
NULL
)
{
...
@@ -2694,7 +2707,7 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) {
...
@@ -2694,7 +2707,7 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) {
return
;
return
;
}
}
SAPercentileInfo
*
pOutput
=
get
APerc
tInfo
(
pCtx
);
SAPercentileInfo
*
pOutput
=
get
Outpu
tInfo
(
pCtx
);
if
(
pOutput
->
pTDigest
->
num_centroids
==
0
)
{
if
(
pOutput
->
pTDigest
->
num_centroids
==
0
)
{
memcpy
(
pOutput
->
pTDigest
,
pInput
->
pTDigest
,
(
size_t
)
TDIGEST_SIZE
(
COMPRESSION
));
memcpy
(
pOutput
->
pTDigest
,
pInput
->
pTDigest
,
(
size_t
)
TDIGEST_SIZE
(
COMPRESSION
));
tdigestAutoFill
(
pOutput
->
pTDigest
,
COMPRESSION
);
tdigestAutoFill
(
pOutput
->
pTDigest
,
COMPRESSION
);
...
@@ -2711,7 +2724,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -2711,7 +2724,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) {
double
q
=
(
pCtx
->
param
[
0
].
nType
==
TSDB_DATA_TYPE_INT
)
?
pCtx
->
param
[
0
].
i64
:
pCtx
->
param
[
0
].
dKey
;
double
q
=
(
pCtx
->
param
[
0
].
nType
==
TSDB_DATA_TYPE_INT
)
?
pCtx
->
param
[
0
].
i64
:
pCtx
->
param
[
0
].
dKey
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pAPerc
=
get
APerc
tInfo
(
pCtx
);
SAPercentileInfo
*
pAPerc
=
get
Outpu
tInfo
(
pCtx
);
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
if
(
pResInfo
->
hasResult
==
DATA_SET_FLAG
)
{
// check for null
if
(
pResInfo
->
hasResult
==
DATA_SET_FLAG
)
{
// check for null
...
@@ -2755,7 +2768,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
...
@@ -2755,7 +2768,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
return
false
;
return
false
;
}
}
SAPercentileInfo
*
pInfo
=
get
APerc
tInfo
(
pCtx
);
SAPercentileInfo
*
pInfo
=
get
Outpu
tInfo
(
pCtx
);
buildHistogramInfo
(
pInfo
);
buildHistogramInfo
(
pInfo
);
char
*
tmp
=
(
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
);
char
*
tmp
=
(
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
);
...
@@ -2772,7 +2785,7 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
...
@@ -2772,7 +2785,7 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
int32_t
notNullElems
=
0
;
int32_t
notNullElems
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pInfo
=
get
APerc
tInfo
(
pCtx
);
SAPercentileInfo
*
pInfo
=
get
Outpu
tInfo
(
pCtx
);
buildHistogramInfo
(
pInfo
);
buildHistogramInfo
(
pInfo
);
assert
(
pInfo
->
pHisto
->
elems
!=
NULL
);
assert
(
pInfo
->
pHisto
->
elems
!=
NULL
);
...
@@ -2816,7 +2829,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
...
@@ -2816,7 +2829,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
return
;
return
;
}
}
SAPercentileInfo
*
pOutput
=
get
APerc
tInfo
(
pCtx
);
SAPercentileInfo
*
pOutput
=
get
Outpu
tInfo
(
pCtx
);
buildHistogramInfo
(
pOutput
);
buildHistogramInfo
(
pOutput
);
SHistogramInfo
*
pHisto
=
pOutput
->
pHisto
;
SHistogramInfo
*
pHisto
=
pOutput
->
pHisto
;
...
@@ -4710,17 +4723,6 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
...
@@ -4710,17 +4723,6 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
//////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////
// Sample function with reservoir sampling algorithm
// Sample function with reservoir sampling algorithm
static
SSampleFuncInfo
*
getSampleFuncOutputInfo
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// only the first_stage stable is directly written data into final output buffer
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
return
(
SSampleFuncInfo
*
)
pCtx
->
pOutput
;
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return
GET_ROWCELL_INTERBUF
(
pResInfo
);
}
}
static
void
assignResultSample
(
SQLFunctionCtx
*
pCtx
,
SSampleFuncInfo
*
pInfo
,
int32_t
index
,
int64_t
ts
,
void
*
pData
,
uint16_t
type
,
int16_t
bytes
,
char
*
inputTags
)
{
static
void
assignResultSample
(
SQLFunctionCtx
*
pCtx
,
SSampleFuncInfo
*
pInfo
,
int32_t
index
,
int64_t
ts
,
void
*
pData
,
uint16_t
type
,
int16_t
bytes
,
char
*
inputTags
)
{
assignVal
(
pInfo
->
values
+
index
*
bytes
,
pData
,
bytes
,
type
);
assignVal
(
pInfo
->
values
+
index
*
bytes
,
pData
,
bytes
,
type
);
*
(
pInfo
->
timeStamps
+
index
)
=
ts
;
*
(
pInfo
->
timeStamps
+
index
)
=
ts
;
...
@@ -4800,7 +4802,7 @@ static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
...
@@ -4800,7 +4802,7 @@ static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
srand
(
taosSafeRand
());
srand
(
taosSafeRand
());
SSampleFuncInfo
*
pRes
=
get
SampleFunc
OutputInfo
(
pCtx
);
SSampleFuncInfo
*
pRes
=
getOutputInfo
(
pCtx
);
pRes
->
totalPoints
=
0
;
pRes
->
totalPoints
=
0
;
pRes
->
numSampled
=
0
;
pRes
->
numSampled
=
0
;
pRes
->
values
=
((
char
*
)
pRes
+
sizeof
(
SSampleFuncInfo
));
pRes
->
values
=
((
char
*
)
pRes
+
sizeof
(
SSampleFuncInfo
));
...
@@ -4814,7 +4816,7 @@ static void sample_function(SQLFunctionCtx *pCtx) {
...
@@ -4814,7 +4816,7 @@ static void sample_function(SQLFunctionCtx *pCtx) {
int32_t
notNullElems
=
0
;
int32_t
notNullElems
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SSampleFuncInfo
*
pRes
=
get
SampleFunc
OutputInfo
(
pCtx
);
SSampleFuncInfo
*
pRes
=
getOutputInfo
(
pCtx
);
if
(
pRes
->
values
!=
((
char
*
)
pRes
+
sizeof
(
SSampleFuncInfo
)))
{
if
(
pRes
->
values
!=
((
char
*
)
pRes
+
sizeof
(
SSampleFuncInfo
)))
{
pRes
->
values
=
((
char
*
)
pRes
+
sizeof
(
SSampleFuncInfo
));
pRes
->
values
=
((
char
*
)
pRes
+
sizeof
(
SSampleFuncInfo
));
...
@@ -4852,7 +4854,7 @@ static void sample_func_merge(SQLFunctionCtx *pCtx) {
...
@@ -4852,7 +4854,7 @@ static void sample_func_merge(SQLFunctionCtx *pCtx) {
pInput
->
timeStamps
=
(
int64_t
*
)((
char
*
)
pInput
->
values
+
pInput
->
colBytes
*
pCtx
->
param
[
0
].
i64
);
pInput
->
timeStamps
=
(
int64_t
*
)((
char
*
)
pInput
->
values
+
pInput
->
colBytes
*
pCtx
->
param
[
0
].
i64
);
pInput
->
taglists
=
(
char
*
)
pInput
->
timeStamps
+
sizeof
(
int64_t
)
*
pCtx
->
param
[
0
].
i64
;
pInput
->
taglists
=
(
char
*
)
pInput
->
timeStamps
+
sizeof
(
int64_t
)
*
pCtx
->
param
[
0
].
i64
;
SSampleFuncInfo
*
pOutput
=
get
SampleFunc
OutputInfo
(
pCtx
);
SSampleFuncInfo
*
pOutput
=
getOutputInfo
(
pCtx
);
pOutput
->
totalPoints
=
pInput
->
totalPoints
;
pOutput
->
totalPoints
=
pInput
->
totalPoints
;
pOutput
->
numSampled
=
pInput
->
numSampled
;
pOutput
->
numSampled
=
pInput
->
numSampled
;
for
(
int32_t
i
=
0
;
i
<
pInput
->
numSampled
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pInput
->
numSampled
;
++
i
)
{
...
@@ -4886,20 +4888,12 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -4886,20 +4888,12 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
//////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////
// elapsed function
// elapsed function
static
SElapsedInfo
*
getSElapsedInfo
(
SQLFunctionCtx
*
pCtx
)
{
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
return
(
SElapsedInfo
*
)
pCtx
->
pOutput
;
}
else
{
return
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
}
}
static
bool
elapsedSetup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
elapsedSetup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
return
false
;
}
}
SElapsedInfo
*
pInfo
=
get
SElapsed
Info
(
pCtx
);
SElapsedInfo
*
pInfo
=
get
Output
Info
(
pCtx
);
pInfo
->
min
=
MAX_TS_KEY
;
pInfo
->
min
=
MAX_TS_KEY
;
pInfo
->
max
=
0
;
pInfo
->
max
=
0
;
pInfo
->
hasResult
=
0
;
pInfo
->
hasResult
=
0
;
...
@@ -4912,7 +4906,7 @@ static int32_t elapsedRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t col
...
@@ -4912,7 +4906,7 @@ static int32_t elapsedRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t col
}
}
static
void
elapsedFunction
(
SQLFunctionCtx
*
pCtx
)
{
static
void
elapsedFunction
(
SQLFunctionCtx
*
pCtx
)
{
SElapsedInfo
*
pInfo
=
get
SElapsed
Info
(
pCtx
);
SElapsedInfo
*
pInfo
=
get
Output
Info
(
pCtx
);
if
(
pCtx
->
preAggVals
.
isSet
)
{
if
(
pCtx
->
preAggVals
.
isSet
)
{
if
(
pInfo
->
min
==
MAX_TS_KEY
)
{
if
(
pInfo
->
min
==
MAX_TS_KEY
)
{
pInfo
->
min
=
pCtx
->
preAggVals
.
statis
.
min
;
pInfo
->
min
=
pCtx
->
preAggVals
.
statis
.
min
;
...
@@ -4979,7 +4973,7 @@ elapsedOver:
...
@@ -4979,7 +4973,7 @@ elapsedOver:
}
}
static
void
elapsedMerge
(
SQLFunctionCtx
*
pCtx
)
{
static
void
elapsedMerge
(
SQLFunctionCtx
*
pCtx
)
{
SElapsedInfo
*
pInfo
=
get
SElapsed
Info
(
pCtx
);
SElapsedInfo
*
pInfo
=
get
Output
Info
(
pCtx
);
memcpy
(
pInfo
,
pCtx
->
pInput
,
(
size_t
)
pCtx
->
inputBytes
);
memcpy
(
pInfo
,
pCtx
->
pInput
,
(
size_t
)
pCtx
->
inputBytes
);
GET_RES_INFO
(
pCtx
)
->
hasResult
=
pInfo
->
hasResult
;
GET_RES_INFO
(
pCtx
)
->
hasResult
=
pInfo
->
hasResult
;
}
}
...
@@ -5002,25 +4996,12 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) {
...
@@ -5002,25 +4996,12 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) {
}
}
//////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////
// histogram function
static
SHistogramFuncInfo
*
getHistogramFuncOutputInfo
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// only the first_stage stable is directly written data into final output buffer
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
return
(
SHistogramFuncInfo
*
)
pCtx
->
pOutput
;
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return
GET_ROWCELL_INTERBUF
(
pResInfo
);
}
}
static
bool
histogram_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
histogram_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
return
false
;
}
}
SHistogramFuncInfo
*
pRes
=
get
HistogramFunc
OutputInfo
(
pCtx
);
SHistogramFuncInfo
*
pRes
=
getOutputInfo
(
pCtx
);
if
(
!
pRes
)
{
if
(
!
pRes
)
{
return
false
;
return
false
;
}
}
...
@@ -5044,7 +5025,7 @@ static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* p
...
@@ -5044,7 +5025,7 @@ static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* p
static
void
histogram_function
(
SQLFunctionCtx
*
pCtx
)
{
static
void
histogram_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SHistogramFuncInfo
*
pRes
=
get
HistogramFunc
OutputInfo
(
pCtx
);
SHistogramFuncInfo
*
pRes
=
getOutputInfo
(
pCtx
);
if
(
pRes
->
orderedBins
!=
(
SHistogramFuncBin
*
)((
char
*
)
pRes
+
sizeof
(
SHistogramFuncInfo
)))
{
if
(
pRes
->
orderedBins
!=
(
SHistogramFuncBin
*
)((
char
*
)
pRes
+
sizeof
(
SHistogramFuncInfo
)))
{
pRes
->
orderedBins
=
(
SHistogramFuncBin
*
)((
char
*
)
pRes
+
sizeof
(
SHistogramFuncInfo
));
pRes
->
orderedBins
=
(
SHistogramFuncBin
*
)((
char
*
)
pRes
+
sizeof
(
SHistogramFuncInfo
));
...
@@ -5092,7 +5073,7 @@ static void histogram_func_merge(SQLFunctionCtx *pCtx) {
...
@@ -5092,7 +5073,7 @@ static void histogram_func_merge(SQLFunctionCtx *pCtx) {
SHistogramFuncInfo
*
pInput
=
(
SHistogramFuncInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
SHistogramFuncInfo
*
pInput
=
(
SHistogramFuncInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
pInput
->
orderedBins
=
(
SHistogramFuncBin
*
)((
char
*
)
pInput
+
sizeof
(
SHistogramFuncInfo
));
pInput
->
orderedBins
=
(
SHistogramFuncBin
*
)((
char
*
)
pInput
+
sizeof
(
SHistogramFuncInfo
));
SHistogramFuncInfo
*
pRes
=
get
HistogramFunc
OutputInfo
(
pCtx
);
SHistogramFuncInfo
*
pRes
=
getOutputInfo
(
pCtx
);
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfBins
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfBins
;
++
i
)
{
pRes
->
orderedBins
[
i
].
count
+=
pInput
->
orderedBins
[
i
].
count
;
pRes
->
orderedBins
[
i
].
count
+=
pInput
->
orderedBins
[
i
].
count
;
}
}
...
@@ -5129,18 +5110,6 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -5129,18 +5110,6 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer
(
pCtx
);
doFinalizer
(
pCtx
);
}
}
// unique use the intermediate result buffer to keep the intermediate result
static
SUniqueFuncInfo
*
getUniqueOutputInfo
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// only the first_stage_merge is directly written data into final output buffer
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
return
(
SUniqueFuncInfo
*
)
pCtx
->
pOutput
;
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return
GET_ROWCELL_INTERBUF
(
pResInfo
);
}
}
// unique
// unique
static
void
copyUniqueRes
(
SQLFunctionCtx
*
pCtx
,
int32_t
bytes
)
{
static
void
copyUniqueRes
(
SQLFunctionCtx
*
pCtx
,
int32_t
bytes
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
...
@@ -5238,7 +5207,7 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK
...
@@ -5238,7 +5207,7 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK
}
}
static
void
unique_function
(
SQLFunctionCtx
*
pCtx
)
{
static
void
unique_function
(
SQLFunctionCtx
*
pCtx
)
{
SUniqueFuncInfo
*
pInfo
=
get
Unique
OutputInfo
(
pCtx
);
SUniqueFuncInfo
*
pInfo
=
getOutputInfo
(
pCtx
);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
i
++
)
{
char
*
pData
=
GET_INPUT_DATA
(
pCtx
,
i
);
char
*
pData
=
GET_INPUT_DATA
(
pCtx
,
i
);
...
@@ -5248,18 +5217,19 @@ static void unique_function(SQLFunctionCtx *pCtx) {
...
@@ -5248,18 +5217,19 @@ static void unique_function(SQLFunctionCtx *pCtx) {
}
}
do_unique_function
(
pCtx
,
pInfo
,
k
,
pData
,
NULL
,
pCtx
->
inputBytes
,
pCtx
->
inputType
);
do_unique_function
(
pCtx
,
pInfo
,
k
,
pData
,
NULL
,
pCtx
->
inputBytes
,
pCtx
->
inputType
);
if
(
sizeof
(
SUniqueFuncInfo
)
+
pInfo
->
num
*
(
sizeof
(
UniqueUnit
)
+
pCtx
->
inputBytes
+
pCtx
->
tagInfo
.
tagsLen
)
>=
MAX_UNIQUE_RESULT_SIZE
){
if
(
sizeof
(
SUniqueFuncInfo
)
+
pInfo
->
num
*
(
sizeof
(
UniqueUnit
)
+
pCtx
->
inputBytes
+
pCtx
->
tagInfo
.
tagsLen
)
>=
MAX_UNIQUE_RESULT_SIZE
||
(
pInfo
->
num
>
MAX_UNIQUE_RESULT_ROWS
)){
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
-
1
;
// mark out of memory
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
-
1
;
// mark out of memory
return
;
return
;
}
}
}
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
// GET_RES_INFO(pCtx)->numOfRes = pInfo->num
;
}
}
static
void
unique_function_merge
(
SQLFunctionCtx
*
pCtx
)
{
static
void
unique_function_merge
(
SQLFunctionCtx
*
pCtx
)
{
SUniqueFuncInfo
*
pInput
=
(
SUniqueFuncInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
SUniqueFuncInfo
*
pInput
=
(
SUniqueFuncInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
SUniqueFuncInfo
*
pOutput
=
get
Unique
OutputInfo
(
pCtx
);
SUniqueFuncInfo
*
pOutput
=
getOutputInfo
(
pCtx
);
size_t
size
=
sizeof
(
UniqueUnit
)
+
pCtx
->
outputBytes
+
pCtx
->
tagInfo
.
tagsLen
;
size_t
size
=
sizeof
(
UniqueUnit
)
+
pCtx
->
outputBytes
+
pCtx
->
tagInfo
.
tagsLen
;
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
char
*
tmp
=
pInput
->
res
+
i
*
size
;
char
*
tmp
=
pInput
->
res
+
i
*
size
;
...
@@ -5268,13 +5238,14 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) {
...
@@ -5268,13 +5238,14 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) {
char
*
tags
=
tmp
+
sizeof
(
UniqueUnit
)
+
pCtx
->
outputBytes
;
char
*
tags
=
tmp
+
sizeof
(
UniqueUnit
)
+
pCtx
->
outputBytes
;
do_unique_function
(
pCtx
,
pOutput
,
timestamp
,
data
,
tags
,
pCtx
->
outputBytes
,
pCtx
->
outputType
);
do_unique_function
(
pCtx
,
pOutput
,
timestamp
,
data
,
tags
,
pCtx
->
outputBytes
,
pCtx
->
outputType
);
if
(
sizeof
(
SUniqueFuncInfo
)
+
pOutput
->
num
*
(
sizeof
(
UniqueUnit
)
+
pCtx
->
outputBytes
+
pCtx
->
tagInfo
.
tagsLen
)
>=
MAX_UNIQUE_RESULT_SIZE
){
if
(
sizeof
(
SUniqueFuncInfo
)
+
pOutput
->
num
*
(
sizeof
(
UniqueUnit
)
+
pCtx
->
outputBytes
+
pCtx
->
tagInfo
.
tagsLen
)
>=
MAX_UNIQUE_RESULT_SIZE
||
(
pOutput
->
num
>
MAX_UNIQUE_RESULT_ROWS
)){
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
-
1
;
// mark out of memory
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
-
1
;
// mark out of memory
return
;
return
;
}
}
}
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
pOutput
->
num
;
//
GET_RES_INFO(pCtx)->numOfRes = pOutput->num;
}
}
typedef
struct
{
typedef
struct
{
...
@@ -5288,7 +5259,7 @@ static int32_t uniqueCompareFn(const void *p1, const void *p2, const void *param
...
@@ -5288,7 +5259,7 @@ static int32_t uniqueCompareFn(const void *p1, const void *p2, const void *param
}
}
static
void
unique_func_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
static
void
unique_func_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SUniqueFuncInfo
*
pInfo
=
getUniqueOutputInfo
(
pCtx
);
SUniqueFuncInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
)
);
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
pInfo
->
num
;
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
pInfo
->
num
;
int32_t
bytes
=
0
;
int32_t
bytes
=
0
;
...
@@ -5317,6 +5288,108 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -5317,6 +5288,108 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer
(
pCtx
);
doFinalizer
(
pCtx
);
}
}
static
bool
mode_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
if
(
*
pCtx
->
pModeSet
!=
NULL
){
taosHashClear
(
*
pCtx
->
pModeSet
);
}
else
{
*
pCtx
->
pModeSet
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
}
return
true
;
}
static
void
do_mode_function
(
SQLFunctionCtx
*
pCtx
,
SModeFuncInfo
*
pInfo
,
char
*
pData
,
int32_t
bytes
,
int16_t
type
){
int32_t
hashKeyBytes
=
bytes
;
if
(
IS_VAR_DATA_TYPE
(
type
)){
// for var data, we can not use bytes, because there are dirty data in the back of var data
hashKeyBytes
=
varDataTLen
(
pData
);
}
ModeUnit
**
mode
=
taosHashGet
(
*
pCtx
->
pModeSet
,
pData
,
hashKeyBytes
);
if
(
mode
==
NULL
)
{
size_t
size
=
sizeof
(
ModeUnit
)
+
bytes
;
char
*
tmp
=
pInfo
->
res
+
pInfo
->
num
*
size
;
((
ModeUnit
*
)
tmp
)
->
count
=
1
;
char
*
data
=
tmp
+
sizeof
(
ModeUnit
);
memcpy
(
data
,
pData
,
bytes
);
taosHashPut
(
*
pCtx
->
pModeSet
,
pData
,
hashKeyBytes
,
&
tmp
,
sizeof
(
ModeUnit
*
));
pInfo
->
num
++
;
}
else
{
(
*
mode
)
->
count
++
;
}
}
static
void
mode_function
(
SQLFunctionCtx
*
pCtx
)
{
SModeFuncInfo
*
pInfo
=
getOutputInfo
(
pCtx
);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
i
++
)
{
char
*
pData
=
GET_INPUT_DATA
(
pCtx
,
i
);
do_mode_function
(
pCtx
,
pInfo
,
pData
,
pCtx
->
inputBytes
,
pCtx
->
inputType
);
if
(
sizeof
(
SModeFuncInfo
)
+
pInfo
->
num
*
(
sizeof
(
ModeUnit
)
+
pCtx
->
inputBytes
)
>=
MAX_MODE_INNER_RESULT_SIZE
){
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
-
1
;
// mark out of memory
return
;
}
}
}
static
void
mode_function_merge
(
SQLFunctionCtx
*
pCtx
)
{
SModeFuncInfo
*
pInput
=
(
SModeFuncInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
SModeFuncInfo
*
pOutput
=
getOutputInfo
(
pCtx
);
size_t
size
=
sizeof
(
ModeUnit
)
+
pCtx
->
outputBytes
;
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
char
*
tmp
=
pInput
->
res
+
i
*
size
;
char
*
data
=
tmp
+
sizeof
(
ModeUnit
);
do_mode_function
(
pCtx
,
pOutput
,
data
,
pCtx
->
outputBytes
,
pCtx
->
outputType
);
if
(
sizeof
(
SModeFuncInfo
)
+
pOutput
->
num
*
(
sizeof
(
ModeUnit
)
+
pCtx
->
outputBytes
)
>=
MAX_MODE_INNER_RESULT_SIZE
){
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
-
1
;
// mark out of memory
return
;
}
}
}
static
void
mode_func_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
int32_t
bytes
=
0
;
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
bytes
=
pCtx
->
outputBytes
;
assert
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_BINARY
);
}
else
{
bytes
=
pCtx
->
inputBytes
;
}
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SModeFuncInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
size_t
size
=
sizeof
(
ModeUnit
)
+
bytes
;
char
*
tvp
=
pRes
->
res
;
char
*
result
=
NULL
;
int64_t
maxCount
=
0
;
for
(
int32_t
i
=
0
;
i
<
pRes
->
num
;
++
i
)
{
int64_t
count
=
((
ModeUnit
*
)
tvp
)
->
count
;
if
(
count
>
maxCount
){
maxCount
=
count
;
result
=
tvp
;
}
else
if
(
count
==
maxCount
){
result
=
NULL
;
}
tvp
+=
size
;
}
if
(
result
){
memcpy
(
pCtx
->
pOutput
,
result
+
sizeof
(
ModeUnit
),
bytes
);
pResInfo
->
numOfRes
=
1
;
}
else
{
pResInfo
->
numOfRes
=
0
;
}
doFinalizer
(
pCtx
);
}
/////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////
/*
/*
* function compatible list.
* function compatible list.
...
@@ -5823,5 +5896,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
...
@@ -5823,5 +5896,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
unique_func_finalizer
,
unique_func_finalizer
,
unique_function_merge
,
unique_function_merge
,
dataBlockRequired
,
dataBlockRequired
,
},
{
// 40
"mode"
,
TSDB_FUNC_MODE
,
TSDB_FUNC_MODE
,
TSDB_FUNCSTATE_SO
,
mode_function_setup
,
mode_function
,
mode_func_finalizer
,
mode_function_merge
,
dataBlockRequired
,
}
}
};
};
src/query/src/qExecutor.c
浏览文件 @
31c77715
...
@@ -1009,11 +1009,9 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
...
@@ -1009,11 +1009,9 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
}
}
}
}
if
(
functionId
==
TSDB_FUNC_UNIQUE
&&
if
(
GET_RES_INFO
(
&
(
pCtx
[
k
]))
->
numOfRes
==
-
1
){
(
GET_RES_INFO
(
&
(
pCtx
[
k
]))
->
numOfRes
>
MAX_UNIQUE_RESULT_ROWS
||
GET_RES_INFO
(
&
(
pCtx
[
k
]))
->
numOfRes
==
-
1
)){
qError
(
"result num is too large."
);
qError
(
"Unique result num is too large. num: %d, limit: %d"
,
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_RESULT_TOO_LARGE
);
GET_RES_INFO
(
&
(
pCtx
[
k
]))
->
numOfRes
,
MAX_UNIQUE_RESULT_ROWS
);
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE
);
}
}
// restore it
// restore it
...
@@ -1276,11 +1274,9 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
...
@@ -1276,11 +1274,9 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
assert
(
0
);
assert
(
0
);
}
}
if
(
functionId
==
TSDB_FUNC_UNIQUE
&&
if
(
GET_RES_INFO
(
&
(
pCtx
[
k
]))
->
numOfRes
==
-
1
){
(
GET_RES_INFO
(
&
(
pCtx
[
k
]))
->
numOfRes
>
MAX_UNIQUE_RESULT_ROWS
||
GET_RES_INFO
(
&
(
pCtx
[
k
]))
->
numOfRes
==
-
1
)){
qError
(
"Mode inner result num is too large"
);
qError
(
"Unique result num is too large. num: %d, limit: %d"
,
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_RESULT_TOO_LARGE
);
GET_RES_INFO
(
&
(
pCtx
[
k
]))
->
numOfRes
,
MAX_UNIQUE_RESULT_ROWS
);
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE
);
}
}
}
}
}
}
...
@@ -3690,6 +3686,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
...
@@ -3690,6 +3686,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
pCtx
[
i
].
resultInfo
=
pCellInfo
;
pCtx
[
i
].
resultInfo
=
pCellInfo
;
if
(
pCtx
[
i
].
functionId
==
TSDB_FUNC_UNIQUE
)
{
if
(
pCtx
[
i
].
functionId
==
TSDB_FUNC_UNIQUE
)
{
pCtx
[
i
].
pUniqueSet
=
&
pRow
->
uniqueHash
;
pCtx
[
i
].
pUniqueSet
=
&
pRow
->
uniqueHash
;
}
else
if
(
pCtx
[
i
].
functionId
==
TSDB_FUNC_MODE
)
{
pCtx
[
i
].
pUniqueSet
=
&
pRow
->
modeHash
;
}
}
pCtx
[
i
].
pOutput
=
pData
->
pData
;
pCtx
[
i
].
pOutput
=
pData
->
pData
;
pCtx
[
i
].
currentStage
=
stage
;
pCtx
[
i
].
currentStage
=
stage
;
...
@@ -4027,6 +4025,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
...
@@ -4027,6 +4025,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
pCtx
[
i
].
resultInfo
=
getResultCell
(
pResult
,
i
,
rowCellInfoOffset
);
pCtx
[
i
].
resultInfo
=
getResultCell
(
pResult
,
i
,
rowCellInfoOffset
);
if
(
pCtx
[
i
].
functionId
==
TSDB_FUNC_UNIQUE
){
if
(
pCtx
[
i
].
functionId
==
TSDB_FUNC_UNIQUE
){
pCtx
[
i
].
pUniqueSet
=
&
pResult
->
uniqueHash
;
pCtx
[
i
].
pUniqueSet
=
&
pResult
->
uniqueHash
;
}
else
if
(
pCtx
[
i
].
functionId
==
TSDB_FUNC_MODE
){
pCtx
[
i
].
pUniqueSet
=
&
pResult
->
modeHash
;
}
}
SResultRowCellInfo
*
pResInfo
=
pCtx
[
i
].
resultInfo
;
SResultRowCellInfo
*
pResInfo
=
pCtx
[
i
].
resultInfo
;
...
@@ -4123,6 +4123,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
...
@@ -4123,6 +4123,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
pCtx
[
i
].
resultInfo
=
getResultCell
(
pResult
,
i
,
rowCellInfoOffset
);
pCtx
[
i
].
resultInfo
=
getResultCell
(
pResult
,
i
,
rowCellInfoOffset
);
if
(
pCtx
[
i
].
functionId
==
TSDB_FUNC_UNIQUE
)
{
if
(
pCtx
[
i
].
functionId
==
TSDB_FUNC_UNIQUE
)
{
pCtx
[
i
].
pUniqueSet
=
&
pResult
->
uniqueHash
;
pCtx
[
i
].
pUniqueSet
=
&
pResult
->
uniqueHash
;
}
else
if
(
pCtx
[
i
].
functionId
==
TSDB_FUNC_MODE
)
{
pCtx
[
i
].
pUniqueSet
=
&
pResult
->
modeHash
;
}
}
}
}
}
}
...
...
src/util/src/terror.c
浏览文件 @
31c77715
...
@@ -299,7 +299,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha
...
@@ -299,7 +299,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INCONSISTAN
,
"File inconsistance in replica"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INCONSISTAN
,
"File inconsistance in replica"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_TIME_CONDITION
,
"One valid time range condition expected"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_TIME_CONDITION
,
"One valid time range condition expected"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_SYS_ERROR
,
"System error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_SYS_ERROR
,
"System error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_
UNIQUE_RESULT_TOO_LARGE
,
"Unique
result num is too large"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_
RESULT_TOO_LARGE
,
"
result num is too large"
)
// grant
// grant
TAOS_DEFINE_ERROR
(
TSDB_CODE_GRANT_EXPIRED
,
"License expired"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_GRANT_EXPIRED
,
"License expired"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录