Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
fd83970a
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
fd83970a
编写于
2月 11, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/develop' into jiacy/fix/td-13457
上级
701dcbb0
57a997a0
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
425 addition
and
22 deletion
+425
-22
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+250
-9
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+2
-1
src/query/inc/qAggMain.h
src/query/inc/qAggMain.h
+4
-8
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+167
-3
src/query/src/qUtil.c
src/query/src/qUtil.c
+2
-1
未找到文件。
src/client/src/tscSQLParser.c
浏览文件 @
fd83970a
...
...
@@ -2657,7 +2657,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
STableMetaInfo
*
pTableMetaInfo
=
NULL
;
int32_t
functionId
=
pItem
->
pNode
->
functionId
;
const
char
*
msg1
=
"
not support
column types"
;
const
char
*
msg1
=
"
unsupported
column types"
;
const
char
*
msg2
=
"invalid parameters"
;
const
char
*
msg3
=
"illegal column name"
;
const
char
*
msg4
=
"invalid table name"
;
...
...
@@ -2675,7 +2675,16 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const
char
*
msg16
=
"elapsed duration should be greater than or equal to database precision"
;
const
char
*
msg17
=
"elapsed/twa should not be used in nested query if inner query has group by clause"
;
const
char
*
msg18
=
"the second parameter is not an integer"
;
const
char
*
msg19
=
"the second paramter of diff should be 0 or 1"
;
const
char
*
msg19
=
"histogram function requires four parameters"
;
const
char
*
msg20
=
"second parameter must be 'user_input', 'linear_bin' or 'log_bin'"
;
const
char
*
msg21
=
"third parameter must be in JSON format"
;
const
char
*
msg22
=
"invalid parameters for bin_desciption"
;
const
char
*
msg23
=
"parameter/bin out of range [-DBL_MAX, DBL_MAX]"
;
const
char
*
msg24
=
"width param cannot be 0"
;
const
char
*
msg25
=
"count param should be in range [1, 1000]"
;
const
char
*
msg26
=
"start param cannot be 0 with 'log_bin'"
;
const
char
*
msg27
=
"factor param cannot be negative or equal to 0/1"
;
const
char
*
msg28
=
"the second paramter of diff should be 0 or 1"
;
switch
(
functionId
)
{
...
...
@@ -2923,7 +2932,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int64_t
ignoreNegative
=
GET_INT64_VAL
(
val
);
if
(
ignoreNegative
!=
0
&&
ignoreNegative
!=
1
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg
19
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg
28
);
}
}
tscExprAddParams
(
&
pExpr
->
base
,
val
,
TSDB_DATA_TYPE_BIGINT
,
sizeof
(
int64_t
));
...
...
@@ -2943,7 +2952,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscColumnListInsert
(
pQueryInfo
->
colList
,
ids
.
ids
[
0
].
columnIndex
,
pExpr
->
base
.
uid
,
pSchema
);
}
tscInsertPrimaryTsSourceColumn
(
pQueryInfo
,
pExpr
->
base
.
uid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3350,6 +3359,237 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return
TSDB_CODE_SUCCESS
;
}
case
TSDB_FUNC_HISTOGRAM
:
{
// check params
if
(
taosArrayGetSize
(
pItem
->
pNode
->
Expr
.
paramList
)
!=
4
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg19
);
}
tSqlExprItem
*
pParamElem
=
taosArrayGet
(
pItem
->
pNode
->
Expr
.
paramList
,
0
);
if
(
pParamElem
->
pNode
->
tokenId
!=
TK_ID
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
SColumnIndex
index
=
COLUMN_INDEX_INITIALIZER
;
if
(
getColumnIndexByName
(
&
pParamElem
->
pNode
->
columnName
,
pQueryInfo
,
&
index
,
tscGetErrorMsgPayload
(
pCmd
))
!=
TSDB_CODE_SUCCESS
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg3
);
}
if
(
index
.
columnIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg6
);
}
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
index
.
tableIndex
);
SSchema
*
pSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
index
.
columnIndex
);
if
(
!
IS_NUMERIC_TYPE
(
pSchema
->
type
))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
//bin_type param
if
(
pParamElem
[
1
].
pNode
->
tokenId
==
TK_ID
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
tVariant
*
pVariant
=
&
pParamElem
[
1
].
pNode
->
value
;
if
(
pVariant
==
NULL
||
pVariant
->
nType
!=
TSDB_DATA_TYPE_BINARY
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
#define USER_INPUT_BIN 0
#define LINEAR_BIN 1
#define LOG_BIN 2
int8_t
binType
;
if
(
strcasecmp
(
pVariant
->
pz
,
"user_input"
)
==
0
)
{
binType
=
USER_INPUT_BIN
;
}
else
if
(
strcasecmp
(
pVariant
->
pz
,
"linear_bin"
)
==
0
)
{
binType
=
LINEAR_BIN
;
}
else
if
(
strcasecmp
(
pVariant
->
pz
,
"log_bin"
)
==
0
)
{
binType
=
LOG_BIN
;
}
else
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg20
);
}
//bin_description param in JSON format
if
(
pParamElem
[
2
].
pNode
->
tokenId
==
TK_ID
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
pVariant
=
&
pParamElem
[
2
].
pNode
->
value
;
if
(
pVariant
==
NULL
&&
pVariant
->
nType
!=
TSDB_DATA_TYPE_BINARY
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
cJSON
*
binDesc
=
cJSON_Parse
(
pVariant
->
pz
);
int32_t
counter
;
int32_t
numBins
;
int32_t
numOutput
;
double
*
intervals
;
if
(
cJSON_IsObject
(
binDesc
))
{
/* linaer/log bins */
int32_t
numOfParams
=
cJSON_GetArraySize
(
binDesc
);
int32_t
startIndex
;
if
(
numOfParams
!=
4
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg22
);
}
cJSON
*
start
=
cJSON_GetObjectItem
(
binDesc
,
"start"
);
cJSON
*
factor
=
cJSON_GetObjectItem
(
binDesc
,
"factor"
);
cJSON
*
width
=
cJSON_GetObjectItem
(
binDesc
,
"width"
);
cJSON
*
count
=
cJSON_GetObjectItem
(
binDesc
,
"count"
);
cJSON
*
infinity
=
cJSON_GetObjectItem
(
binDesc
,
"infinity"
);
if
(
!
cJSON_IsNumber
(
start
)
||
!
cJSON_IsNumber
(
count
)
||
!
cJSON_IsBool
(
infinity
))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg22
);
}
if
(
count
->
valueint
<=
0
||
count
->
valueint
>
1000
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg25
);
}
if
(
isinf
(
start
->
valuedouble
)
||
(
width
!=
NULL
&&
isinf
(
width
->
valuedouble
))
||
(
factor
!=
NULL
&&
isinf
(
factor
->
valuedouble
))
||
(
count
!=
NULL
&&
isinf
(
count
->
valuedouble
)))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg23
);
}
counter
=
(
int32_t
)
count
->
valueint
;
if
(
infinity
->
valueint
==
false
)
{
startIndex
=
0
;
numBins
=
counter
+
1
;
}
else
{
startIndex
=
1
;
numBins
=
counter
+
3
;
}
intervals
=
tcalloc
(
numBins
,
sizeof
(
double
));
if
(
cJSON_IsNumber
(
width
)
&&
factor
==
NULL
&&
binType
==
LINEAR_BIN
)
{
//linear bin process
if
(
width
->
valuedouble
==
0
)
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg24
);
}
for
(
int
i
=
0
;
i
<
counter
+
1
;
++
i
)
{
intervals
[
startIndex
]
=
start
->
valuedouble
+
i
*
width
->
valuedouble
;
if
(
isinf
(
intervals
[
startIndex
]))
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg23
);
}
startIndex
++
;
}
}
else
if
(
cJSON_IsNumber
(
factor
)
&&
width
==
NULL
&&
binType
==
LOG_BIN
)
{
//log bin process
if
(
start
->
valuedouble
==
0
)
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg26
);
}
if
(
factor
->
valuedouble
<
0
||
factor
->
valuedouble
==
0
||
factor
->
valuedouble
==
1
)
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg27
);
}
for
(
int
i
=
0
;
i
<
counter
+
1
;
++
i
)
{
intervals
[
startIndex
]
=
start
->
valuedouble
*
pow
(
factor
->
valuedouble
,
i
*
1
.
0
);
if
(
isinf
(
intervals
[
startIndex
]))
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg23
);
}
startIndex
++
;
}
}
else
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg22
);
}
if
(
infinity
->
valueint
==
true
)
{
intervals
[
0
]
=
-
DBL_MAX
;
intervals
[
numBins
-
1
]
=
DBL_MAX
;
if
(
isinf
(
intervals
[
0
])
||
isinf
(
intervals
[
numBins
-
1
]))
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg23
);
}
//in case of desc bin orders, -inf/inf should be swapped
assert
(
numBins
>=
4
);
if
(
intervals
[
1
]
>
intervals
[
numBins
-
2
])
{
SWAP
(
intervals
[
0
],
intervals
[
numBins
-
1
],
double
);
}
}
}
else
if
(
cJSON_IsArray
(
binDesc
))
{
/* user input bins */
if
(
binType
!=
USER_INPUT_BIN
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg22
);
}
counter
=
numBins
=
cJSON_GetArraySize
(
binDesc
);
intervals
=
tcalloc
(
numBins
,
sizeof
(
double
));
cJSON
*
bin
=
binDesc
->
child
;
if
(
bin
==
NULL
)
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg22
);
}
int
i
=
0
;
while
(
bin
)
{
intervals
[
i
]
=
bin
->
valuedouble
;
if
(
!
cJSON_IsNumber
(
bin
))
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg22
);
}
if
(
i
!=
0
&&
intervals
[
i
]
<=
intervals
[
i
-
1
])
{
tfree
(
intervals
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg22
);
}
bin
=
bin
->
next
;
i
++
;
}
}
else
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg21
);
}
int16_t
resultType
=
pSchema
->
type
;
int32_t
resultSize
=
pSchema
->
bytes
;
int32_t
interResult
=
0
;
getResultDataInfo
(
pSchema
->
type
,
pSchema
->
bytes
,
functionId
,
counter
,
&
resultType
,
&
resultSize
,
&
interResult
,
0
,
false
,
pUdfInfo
);
SExprInfo
*
pExpr
=
NULL
;
pExpr
=
tscExprAppend
(
pQueryInfo
,
functionId
,
&
index
,
resultType
,
resultSize
,
getNewResColId
(
pCmd
),
interResult
,
false
);
numOutput
=
numBins
-
1
;
tscExprAddParams
(
&
pExpr
->
base
,
(
char
*
)
&
numOutput
,
TSDB_DATA_TYPE_INT
,
sizeof
(
int32_t
));
tscExprAddParams
(
&
pExpr
->
base
,
(
char
*
)
intervals
,
TSDB_DATA_TYPE_BINARY
,
sizeof
(
double
)
*
numBins
);
tfree
(
intervals
);
//normalized param
char
val
[
8
]
=
{
0
};
if
(
pParamElem
[
3
].
pNode
->
tokenId
==
TK_ID
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
pVariant
=
&
pParamElem
[
3
].
pNode
->
value
;
if
(
pVariant
==
NULL
||
pVariant
->
nType
!=
TSDB_DATA_TYPE_BIGINT
||
(
pVariant
->
i64
!=
0
&&
pVariant
->
i64
!=
1
))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
if
(
tVariantDump
(
pVariant
,
val
,
TSDB_DATA_TYPE_BIGINT
,
true
)
<
0
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
tscExprAddParams
(
&
pExpr
->
base
,
val
,
TSDB_DATA_TYPE_BIGINT
,
LONG_BYTES
);
memset
(
pExpr
->
base
.
aliasName
,
0
,
tListLen
(
pExpr
->
base
.
aliasName
));
getColumnName
(
pItem
,
pExpr
->
base
.
aliasName
,
pExpr
->
base
.
token
,
sizeof
(
pExpr
->
base
.
aliasName
)
-
1
);
// todo refactor: tscColumnListInsert part
SColumnList
ids
=
createColumnList
(
1
,
index
.
tableIndex
,
index
.
columnIndex
);
if
(
finalResult
)
{
insertResultField
(
pQueryInfo
,
colIndex
,
&
ids
,
resultSize
,
(
int8_t
)
resultType
,
pExpr
->
base
.
aliasName
,
pExpr
);
}
else
{
assert
(
ids
.
num
==
1
);
tscColumnListInsert
(
pQueryInfo
->
colList
,
ids
.
ids
[
0
].
columnIndex
,
pExpr
->
base
.
uid
,
pSchema
);
}
tscInsertPrimaryTsSourceColumn
(
pQueryInfo
,
pExpr
->
base
.
uid
);
return
TSDB_CODE_SUCCESS
;
}
default:
{
assert
(
!
TSDB_FUNC_IS_SCALAR
(
functionId
));
pUdfInfo
=
isValidUdf
(
pQueryInfo
->
pUdfInfo
,
pItem
->
pNode
->
Expr
.
operand
.
z
,
pItem
->
pNode
->
Expr
.
operand
.
n
);
...
...
@@ -3358,9 +3598,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
if
(
pItem
->
pNode
->
Expr
.
paramList
==
NULL
||
taosArrayGetSize
(
pItem
->
pNode
->
Expr
.
paramList
)
<=
0
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg13
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg13
);
}
tSqlExprItem
*
pParamElem
=
taosArrayGet
(
pItem
->
pNode
->
Expr
.
paramList
,
0
);;
if
(
pParamElem
->
pNode
->
tokenId
!=
TK_ID
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
...
...
@@ -3722,7 +3962,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
if
((
functionId
>=
TSDB_FUNC_SUM
&&
functionId
<=
TSDB_FUNC_TWA
)
||
(
functionId
>=
TSDB_FUNC_FIRST_DST
&&
functionId
<=
TSDB_FUNC_STDDEV_DST
)
||
(
functionId
>=
TSDB_FUNC_RATE
&&
functionId
<=
TSDB_FUNC_IRATE
)
||
(
functionId
==
TSDB_FUNC_SAMPLE
)
||
(
functionId
==
TSDB_FUNC_ELAPSED
))
{
(
functionId
==
TSDB_FUNC_SAMPLE
)
||
(
functionId
==
TSDB_FUNC_ELAPSED
)
||
(
functionId
==
TSDB_FUNC_HISTOGRAM
))
{
if
(
getResultDataInfo
(
pSrcSchema
->
type
,
pSrcSchema
->
bytes
,
functionId
,
(
int32_t
)
pExpr
->
base
.
param
[
0
].
i64
,
&
type
,
&
bytes
,
&
interBytes
,
0
,
true
,
NULL
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
...
...
@@ -6198,7 +6439,7 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo
const
char
*
msg1
=
"value is expected"
;
const
char
*
msg2
=
"invalid fill option"
;
const
char
*
msg3
=
"top/bottom/sample not support fill"
;
const
char
*
msg3
=
"top/bottom/sample
/histogram
not support fill"
;
const
char
*
msg4
=
"illegal value or data overflow"
;
const
char
*
msg5
=
"fill only available for interval query"
;
const
char
*
msg7
=
"join query not supported fill operation"
;
...
...
@@ -6308,7 +6549,7 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
SExprInfo
*
pExpr
=
tscExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
base
.
functionId
==
TSDB_FUNC_TOP
||
pExpr
->
base
.
functionId
==
TSDB_FUNC_BOTTOM
||
pExpr
->
base
.
functionId
==
TSDB_FUNC_SAMPLE
)
{
||
pExpr
->
base
.
functionId
==
TSDB_FUNC_SAMPLE
||
pExpr
->
base
.
functionId
==
TSDB_FUNC_HISTOGRAM
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg3
);
}
}
...
...
src/client/src/tscUtil.c
浏览文件 @
fd83970a
...
...
@@ -688,7 +688,8 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
if
((
!
IS_MULTIOUTPUT
(
aAggs
[
functionId
].
status
))
||
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_TS_COMP
||
functionId
==
TSDB_FUNC_SAMPLE
))
{
functionId
==
TSDB_FUNC_SAMPLE
||
functionId
==
TSDB_FUNC_HISTOGRAM
))
{
return
true
;
}
}
...
...
src/query/inc/qAggMain.h
浏览文件 @
fd83970a
...
...
@@ -76,14 +76,10 @@ extern "C" {
#define TSDB_FUNC_BLKINFO 36
#define TSDB_FUNC_ELAPSED 37
///////////////////////////////////////////
// the following functions is not implemented.
// after implementation, move them before TSDB_FUNC_BLKINFO. also make TSDB_FUNC_BLKINFO the maxium function index
// #define TSDB_FUNC_HISTOGRAM 40
// #define TSDB_FUNC_HLL 41
// #define TSDB_FUNC_MODE 42
#define TSDB_FUNC_ELAPSED 37
#define TSDB_FUNC_HISTOGRAM 38
#define TSDB_FUNC_MAX_NUM 39
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
...
...
src/query/src/qAggMain.c
浏览文件 @
fd83970a
...
...
@@ -211,6 +211,18 @@ typedef struct {
};
}
SDiffFuncInfo
;
typedef
struct
{
double
lower
;
// >lower
double
upper
;
// <=upper
double
count
;
}
SHistogramFuncBin
;
typedef
struct
{
int32_t
numOfBins
;
int32_t
normalized
;
SHistogramFuncBin
*
orderedBins
;
}
SHistogramFuncInfo
;
int32_t
getResultDataInfo
(
int32_t
dataType
,
int32_t
dataBytes
,
int32_t
functionId
,
int32_t
param
,
int16_t
*
type
,
int32_t
*
bytes
,
int32_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
,
SUdfInfo
*
pUdfInfo
)
{
if
(
!
isValidDataType
(
dataType
))
{
...
...
@@ -378,6 +390,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*
bytes
=
sizeof
(
SElapsedInfo
);
*
interBytes
=
*
bytes
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
functionId
==
TSDB_FUNC_HISTOGRAM
)
{
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
bytes
=
512
;
*
interBytes
=
(
sizeof
(
SHistogramFuncInfo
)
+
param
*
sizeof
(
SHistogramFuncBin
));
return
TSDB_CODE_SUCCESS
;
}
}
...
...
@@ -482,6 +499,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*
type
=
TSDB_DATA_TYPE_DOUBLE
;
*
bytes
=
tDataTypes
[
*
type
].
bytes
;
*
interBytes
=
sizeof
(
SElapsedInfo
);
}
else
if
(
functionId
==
TSDB_FUNC_HISTOGRAM
)
{
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
bytes
=
512
;
*
interBytes
=
(
sizeof
(
SHistogramFuncInfo
)
+
param
*
sizeof
(
SHistogramFuncBin
));
}
else
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
...
...
@@ -503,7 +524,7 @@ int32_t isValidFunction(const char* name, int32_t len) {
}
}
for
(
int32_t
i
=
0
;
i
<
=
TSDB_FUNC_ELAPSED
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
TSDB_FUNC_MAX_NUM
;
++
i
)
{
int32_t
nameLen
=
(
int32_t
)
strlen
(
aAggs
[
i
].
name
);
if
(
len
!=
nameLen
)
{
continue
;
...
...
@@ -4830,6 +4851,9 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer
(
pCtx
);
}
//////////////////////////////////////////////////////////////////////////////////
// elapsed function
static
SElapsedInfo
*
getSElapsedInfo
(
SQLFunctionCtx
*
pCtx
)
{
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
return
(
SElapsedInfo
*
)
pCtx
->
pOutput
;
...
...
@@ -4945,6 +4969,134 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) {
doFinalizer
(
pCtx
);
}
//////////////////////////////////////////////////////////////////////////////////
// histogram function
static
SHistogramFuncInfo
*
getHistogramFuncOutputInfo
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// only the first_stage stable is directly written data into final output buffer
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
return
(
SHistogramFuncInfo
*
)
pCtx
->
pOutput
;
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return
GET_ROWCELL_INTERBUF
(
pResInfo
);
}
}
static
bool
histogram_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
SHistogramFuncInfo
*
pRes
=
getHistogramFuncOutputInfo
(
pCtx
);
if
(
!
pRes
)
{
return
false
;
}
int32_t
numOfBins
=
(
int32_t
)
pCtx
->
param
[
0
].
i64
;
double
*
listBin
=
(
double
*
)
pCtx
->
param
[
1
].
pz
;
int32_t
normalized
=
(
int32_t
)
pCtx
->
param
[
2
].
i64
;
pRes
->
numOfBins
=
numOfBins
;
pRes
->
normalized
=
normalized
;
pRes
->
orderedBins
=
(
SHistogramFuncBin
*
)((
char
*
)
pRes
+
sizeof
(
SHistogramFuncInfo
));
for
(
int32_t
i
=
0
;
i
<
numOfBins
;
++
i
)
{
double
lower
=
listBin
[
i
]
<
listBin
[
i
+
1
]
?
listBin
[
i
]
:
listBin
[
i
+
1
];
double
upper
=
listBin
[
i
+
1
]
>
listBin
[
i
]
?
listBin
[
i
+
1
]
:
listBin
[
i
];
pRes
->
orderedBins
[
i
].
lower
=
lower
;
pRes
->
orderedBins
[
i
].
upper
=
upper
;
pRes
->
orderedBins
[
i
].
count
=
0
;
}
return
true
;
}
static
void
histogram_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SHistogramFuncInfo
*
pRes
=
getHistogramFuncOutputInfo
(
pCtx
);
if
(
pRes
->
orderedBins
!=
(
SHistogramFuncBin
*
)((
char
*
)
pRes
+
sizeof
(
SHistogramFuncInfo
)))
{
pRes
->
orderedBins
=
(
SHistogramFuncBin
*
)((
char
*
)
pRes
+
sizeof
(
SHistogramFuncInfo
));
}
int32_t
notNullElems
=
0
;
int32_t
totalElems
=
0
;
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
char
*
data
=
GET_INPUT_DATA
(
pCtx
,
i
);
if
(
pCtx
->
hasNull
&&
isNull
(
data
,
pCtx
->
inputType
))
{
continue
;
}
notNullElems
++
;
double
v
;
GET_TYPED_DATA
(
v
,
double
,
pCtx
->
inputType
,
data
);
for
(
int32_t
b
=
0
;
b
<
pRes
->
numOfBins
;
++
b
)
{
if
(
v
>
pRes
->
orderedBins
[
b
].
lower
&&
v
<=
pRes
->
orderedBins
[
b
].
upper
)
{
pRes
->
orderedBins
[
b
].
count
++
;
totalElems
++
;
break
;
}
}
}
if
(
pRes
->
normalized
)
{
for
(
int32_t
b
=
0
;
b
<
pRes
->
numOfBins
;
++
b
)
{
if
(
totalElems
!=
0
)
{
pRes
->
orderedBins
[
b
].
count
=
pRes
->
orderedBins
[
b
].
count
/
(
double
)
totalElems
;
}
else
{
pRes
->
orderedBins
[
b
].
count
=
0
;
}
}
}
// treat the result as only one result
SET_VAL
(
pCtx
,
notNullElems
,
1
);
if
(
notNullElems
>
0
)
{
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
}
static
void
histogram_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
SHistogramFuncInfo
*
pInput
=
(
SHistogramFuncInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
pInput
->
orderedBins
=
(
SHistogramFuncBin
*
)((
char
*
)
pInput
+
sizeof
(
SHistogramFuncInfo
));
SHistogramFuncInfo
*
pRes
=
getHistogramFuncOutputInfo
(
pCtx
);
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfBins
;
++
i
)
{
pRes
->
orderedBins
[
i
].
count
+=
pInput
->
orderedBins
[
i
].
count
;
}
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
numOfRes
=
1
;
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
static
void
histogram_func_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SHistogramFuncInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
!
pRes
)
{
return
;
}
for
(
int32_t
i
=
0
;
i
<
pRes
->
numOfBins
;
++
i
)
{
int
sz
;
if
(
!
pRes
->
normalized
)
{
int64_t
count
=
(
int64_t
)
pRes
->
orderedBins
[
i
].
count
;
sz
=
sprintf
(
pCtx
->
pOutput
+
VARSTR_HEADER_SIZE
,
"(%g:%g]:%"
PRId64
,
pRes
->
orderedBins
[
i
].
lower
,
pRes
->
orderedBins
[
i
].
upper
,
count
);
}
else
{
sz
=
sprintf
(
pCtx
->
pOutput
+
VARSTR_HEADER_SIZE
,
"(%g:%g]:%lf"
,
pRes
->
orderedBins
[
i
].
lower
,
pRes
->
orderedBins
[
i
].
upper
,
pRes
->
orderedBins
[
i
].
count
);
}
varDataSetLen
(
pCtx
->
pOutput
,
sz
);
pCtx
->
pOutput
+=
pCtx
->
outputBytes
;
}
pResInfo
->
numOfRes
=
pRes
->
numOfBins
;
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
doFinalizer
(
pCtx
);
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
...
...
@@ -4965,8 +5117,8 @@ int32_t functionCompatList[] = {
1
,
1
,
1
,
1
,
-
1
,
1
,
1
,
1
,
5
,
1
,
1
,
// tid_tag, deriv, csum, mavg, sample,
6
,
8
,
-
1
,
-
1
,
-
1
,
// block_info,
elapsed
7
,
1
// block_info,
elapsed,histogram
7
,
1
,
-
1
};
SAggFunctionInfo
aAggs
[
40
]
=
{{
...
...
@@ -5427,5 +5579,17 @@ SAggFunctionInfo aAggs[40] = {{
elapsedFinalizer
,
elapsedMerge
,
elapsedRequired
,
},
{
//38
"histogram"
,
TSDB_FUNC_HISTOGRAM
,
TSDB_FUNC_HISTOGRAM
,
TSDB_FUNCSTATE_MO
|
TSDB_FUNCSTATE_STABLE
,
histogram_function_setup
,
histogram_function
,
histogram_func_finalizer
,
histogram_func_merge
,
dataBlockRequired
,
}
};
src/query/src/qUtil.c
浏览文件 @
fd83970a
...
...
@@ -37,7 +37,8 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
for
(
int16_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
if
(
pQueryAttr
->
pExpr1
[
i
].
base
.
functionId
==
TSDB_FUNC_TOP
||
pQueryAttr
->
pExpr1
[
i
].
base
.
functionId
==
TSDB_FUNC_BOTTOM
||
pQueryAttr
->
pExpr1
[
i
].
base
.
functionId
==
TSDB_FUNC_SAMPLE
)
{
pQueryAttr
->
pExpr1
[
i
].
base
.
functionId
==
TSDB_FUNC_SAMPLE
||
pQueryAttr
->
pExpr1
[
i
].
base
.
functionId
==
TSDB_FUNC_HISTOGRAM
)
{
return
(
int32_t
)
pQueryAttr
->
pExpr1
[
i
].
base
.
param
[
0
].
i64
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录