Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
46a5ede3
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
46a5ede3
编写于
10月 18, 2022
作者:
G
Ganlin Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix format
上级
30fbd884
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
212 addition
and
212 deletion
+212
-212
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+212
-212
未找到文件。
src/query/src/qAggMain.c
浏览文件 @
46a5ede3
...
...
@@ -1953,7 +1953,7 @@ static void stddev_function(SQLFunctionCtx *pCtx) {
pStd
->
avg
=
GET_DOUBLE_VAL
(
pCtx
->
pOutput
);
assert
((
isnan
(
pAvg
->
sum
)
&&
pAvg
->
num
==
0
)
||
(
pStd
->
num
==
pAvg
->
num
&&
pStd
->
avg
==
pAvg
->
sum
));
}
if
(
pStd
->
stage
==
0
)
{
// the first stage is to calculate average value
avg_function
(
pCtx
);
...
...
@@ -1962,7 +1962,7 @@ static void stddev_function(SQLFunctionCtx *pCtx) {
// if pStd->num == 0, there are no numbers in the first round check. No need to do the second round
double
*
retVal
=
&
pStd
->
res
;
double
avg
=
pStd
->
avg
;
void
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
int32_t
num
=
0
;
...
...
@@ -2016,14 +2016,14 @@ static void stddev_function(SQLFunctionCtx *pCtx) {
default:
qError
(
"stddev function not support data type:%d"
,
pCtx
->
inputType
);
}
SET_VAL
(
pCtx
,
1
,
1
);
}
}
static
void
stddev_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SStddevInfo
*
pStd
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
if
(
pStd
->
num
<=
0
)
{
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
}
else
{
...
...
@@ -2031,7 +2031,7 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) {
SET_DOUBLE_VAL
(
retValue
,
sqrt
(
pStd
->
res
/
pStd
->
num
));
SET_VAL
(
pCtx
,
1
,
1
);
}
doFinalizer
(
pCtx
);
}
...
...
@@ -2184,11 +2184,11 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
// used to keep the timestamp for comparison
pCtx
->
param
[
1
].
nType
=
0
;
pCtx
->
param
[
1
].
i64
=
0
;
return
true
;
}
...
...
@@ -2217,7 +2217,7 @@ static void first_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
hasNull
&&
isNull
(
data
,
pCtx
->
inputType
))
{
continue
;
}
memcpy
(
pCtx
->
pOutput
,
data
,
pCtx
->
inputBytes
);
if
(
pCtx
->
ptsList
!=
NULL
)
{
TSKEY
k
=
GET_TS_DATA
(
pCtx
,
i
);
...
...
@@ -2227,7 +2227,7 @@ static void first_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo
*
pInfo
=
GET_RES_INFO
(
pCtx
);
pInfo
->
hasResult
=
DATA_SET_FLAG
;
pInfo
->
complete
=
true
;
notNullElems
++
;
break
;
}
...
...
@@ -2258,14 +2258,14 @@ static void first_function(SQLFunctionCtx *pCtx) {
static
void
first_data_assign_impl
(
SQLFunctionCtx
*
pCtx
,
char
*
pData
,
int32_t
idx
)
{
int64_t
*
timestamp
=
GET_TS_LIST
(
pCtx
);
SFirstLastInfo
*
pInfo
=
(
SFirstLastInfo
*
)(
pCtx
->
pOutput
+
pCtx
->
inputBytes
);
if
(
pInfo
->
hasResult
!=
DATA_SET_FLAG
||
timestamp
[
idx
]
<
pInfo
->
ts
)
{
memcpy
(
pCtx
->
pOutput
,
pData
,
pCtx
->
inputBytes
);
pInfo
->
hasResult
=
DATA_SET_FLAG
;
pInfo
->
ts
=
timestamp
[
idx
];
DO_UPDATE_TAG_COLUMNS
(
pCtx
,
pInfo
->
ts
);
}
}
...
...
@@ -2283,7 +2283,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
/* || pCtx->preAggVals.dataBlockLoaded == false*/
)
{
return
;
}
int32_t
notNullElems
=
0
;
// find the first not null value
...
...
@@ -2292,16 +2292,16 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
hasNull
&&
isNull
(
data
,
pCtx
->
inputType
))
{
continue
;
}
first_data_assign_impl
(
pCtx
,
data
,
i
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
notNullElems
++
;
break
;
}
SET_VAL
(
pCtx
,
notNullElems
,
1
);
}
...
...
@@ -2313,16 +2313,16 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
if
(
pInput
->
hasResult
!=
DATA_SET_FLAG
)
{
return
;
}
// The param[1] is used to keep the initial value of max ts value
if
(
pCtx
->
param
[
1
].
nType
!=
pCtx
->
outputType
||
pCtx
->
param
[
1
].
i64
>
pInput
->
ts
)
{
memcpy
(
pCtx
->
pOutput
,
pData
,
pCtx
->
outputBytes
);
pCtx
->
param
[
1
].
i64
=
pInput
->
ts
;
pCtx
->
param
[
1
].
nType
=
pCtx
->
outputType
;
DO_UPDATE_TAG_COLUMNS
(
pCtx
,
pInput
->
ts
);
}
SET_VAL
(
pCtx
,
1
,
1
);
GET_RES_INFO
(
pCtx
)
->
hasResult
=
DATA_SET_FLAG
;
}
...
...
@@ -2396,18 +2396,18 @@ static void last_function(SQLFunctionCtx *pCtx) {
static
void
last_data_assign_impl
(
SQLFunctionCtx
*
pCtx
,
char
*
pData
,
int32_t
idx
)
{
int64_t
*
timestamp
=
GET_TS_LIST
(
pCtx
);
SFirstLastInfo
*
pInfo
=
(
SFirstLastInfo
*
)(
pCtx
->
pOutput
+
pCtx
->
inputBytes
);
if
(
pInfo
->
hasResult
!=
DATA_SET_FLAG
||
pInfo
->
ts
<
timestamp
[
idx
])
{
#if defined(_DEBUG_VIEW)
qDebug
(
"assign index:%d, ts:%"
PRId64
", val:%d, "
,
idx
,
timestamp
[
idx
],
*
(
int32_t
*
)
pData
);
#endif
memcpy
(
pCtx
->
pOutput
,
pData
,
pCtx
->
inputBytes
);
pInfo
->
hasResult
=
DATA_SET_FLAG
;
pInfo
->
ts
=
timestamp
[
idx
];
DO_UPDATE_TAG_COLUMNS
(
pCtx
,
pInfo
->
ts
);
}
}
...
...
@@ -2426,19 +2426,19 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
char
*
data
=
GET_INPUT_DATA
(
pCtx
,
i
);
if
(
pCtx
->
hasNull
&&
isNull
(
data
,
pCtx
->
inputType
))
{
if
(
!
pCtx
->
requireNull
)
{
continue
;
continue
;
}
}
last_data_assign_impl
(
pCtx
,
data
,
i
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
notNullElems
++
;
break
;
}
SET_VAL
(
pCtx
,
notNullElems
,
1
);
}
...
...
@@ -2449,12 +2449,12 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
*/
static
void
last_dist_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
char
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
SFirstLastInfo
*
pInput
=
(
SFirstLastInfo
*
)
(
pData
+
pCtx
->
outputBytes
);
if
(
pInput
->
hasResult
!=
DATA_SET_FLAG
)
{
return
;
}
/*
* param[1] used to keep the corresponding timestamp to decide if current result is
* the true last result
...
...
@@ -2463,10 +2463,10 @@ static void last_dist_func_merge(SQLFunctionCtx *pCtx) {
memcpy
(
pCtx
->
pOutput
,
pData
,
pCtx
->
outputBytes
);
pCtx
->
param
[
1
].
i64
=
pInput
->
ts
;
pCtx
->
param
[
1
].
nType
=
pCtx
->
outputType
;
DO_UPDATE_TAG_COLUMNS
(
pCtx
,
pInput
->
ts
);
}
SET_VAL
(
pCtx
,
1
,
1
);
GET_RES_INFO
(
pCtx
)
->
hasResult
=
DATA_SET_FLAG
;
}
...
...
@@ -2481,16 +2481,16 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
// assign the last element in current data block
assignVal
(
pCtx
->
pOutput
,
pData
+
(
pCtx
->
size
-
1
)
*
pCtx
->
inputBytes
,
pCtx
->
inputBytes
,
pCtx
->
inputType
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
// set the result to final result buffer in case of super table query
if
(
pCtx
->
stableQuery
)
{
SLastrowInfo
*
pInfo1
=
(
SLastrowInfo
*
)(
pCtx
->
pOutput
+
pCtx
->
inputBytes
);
pInfo1
->
ts
=
GET_TS_DATA
(
pCtx
,
pCtx
->
size
-
1
);
pInfo1
->
hasResult
=
DATA_SET_FLAG
;
DO_UPDATE_TAG_COLUMNS
(
pCtx
,
pInfo1
->
ts
);
}
else
{
TSKEY
ts
=
GET_TS_DATA
(
pCtx
,
pCtx
->
size
-
1
);
...
...
@@ -2507,7 +2507,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) {
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
return
;
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
doFinalizer
(
pCtx
);
}
...
...
@@ -2518,7 +2518,7 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
dst
->
v
.
nType
=
type
;
dst
->
v
.
i64
=
*
(
int64_t
*
)
val
;
dst
->
timestamp
=
tsKey
;
int32_t
size
=
0
;
if
(
stage
==
MERGE_STAGE
)
{
memcpy
(
dst
->
pTags
,
pTags
,
(
size_t
)
pTagInfo
->
tagsLen
);
...
...
@@ -2529,7 +2529,7 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
ctx
->
tag
.
nType
=
TSDB_DATA_TYPE_BIGINT
;
ctx
->
tag
.
i64
=
tsKey
;
}
tVariantDump
(
&
ctx
->
tag
,
dst
->
pTags
+
size
,
ctx
->
tag
.
nType
,
true
);
size
+=
pTagInfo
->
pTagCtxList
[
i
]
->
outputBytes
;
}
...
...
@@ -2590,7 +2590,7 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData,
SExtTagsInfo
*
pTagInfo
,
char
*
pTags
,
int16_t
stage
)
{
tVariant
val
=
{
0
};
tVariantCreateFromBinary
(
&
val
,
pData
,
tDataTypes
[
type
].
bytes
,
type
);
tValuePair
**
pList
=
pInfo
->
res
;
assert
(
pList
!=
NULL
);
...
...
@@ -2598,7 +2598,7 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData,
valuePairAssign
(
pList
[
pInfo
->
num
],
type
,
(
const
char
*
)
&
val
.
i64
,
ts
,
pTags
,
pTagInfo
,
stage
);
taosheapsort
((
void
*
)
pList
,
sizeof
(
tValuePair
**
),
pInfo
->
num
+
1
,
(
const
void
*
)
&
type
,
topBotComparFn
,
(
const
void
*
)
&
pTagInfo
->
tagsLen
,
topBotSwapFn
,
0
);
pInfo
->
num
++
;
}
else
{
if
((
IS_SIGNED_NUMERIC_TYPE
(
type
)
&&
val
.
i64
>
pList
[
0
]
->
v
.
i64
)
||
...
...
@@ -2637,7 +2637,7 @@ static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pDa
static
int32_t
resAscComparFn
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
tValuePair
*
pLeftElem
=
*
(
tValuePair
**
)
pLeft
;
tValuePair
*
pRightElem
=
*
(
tValuePair
**
)
pRight
;
if
(
pLeftElem
->
timestamp
==
pRightElem
->
timestamp
)
{
return
0
;
}
else
{
...
...
@@ -2650,7 +2650,7 @@ static int32_t resDescComparFn(const void *pLeft, const void *pRight) { return -
static
int32_t
resDataAscComparFn
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
tValuePair
*
pLeftElem
=
*
(
tValuePair
**
)
pLeft
;
tValuePair
*
pRightElem
=
*
(
tValuePair
**
)
pRight
;
if
(
IS_FLOAT_TYPE
(
pLeftElem
->
v
.
nType
))
{
if
(
pLeftElem
->
v
.
dKey
==
pRightElem
->
v
.
dKey
)
{
return
0
;
...
...
@@ -2677,9 +2677,9 @@ static int32_t resDataDescComparFn(const void *pLeft, const void *pRight) { retu
static
void
copyTopBotRes
(
SQLFunctionCtx
*
pCtx
,
int32_t
type
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STopBotInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
tValuePair
**
tvp
=
pRes
->
res
;
int32_t
step
=
QUERY_ASC_FORWARD_STEP
;
int32_t
len
=
(
int32_t
)(
GET_RES_INFO
(
pCtx
)
->
numOfRes
);
...
...
@@ -2735,13 +2735,13 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
return
;
}
}
// set the output timestamp of each record.
TSKEY
*
output
=
pCtx
->
ptsOutputBuf
;
for
(
int32_t
i
=
0
;
i
<
len
;
++
i
,
output
+=
step
)
{
*
output
=
tvp
[
i
]
->
timestamp
;
}
// set the corresponding tag data for each record
// todo check malloc failure
if
(
pCtx
->
tagInfo
.
numOfTagCols
==
0
)
{
...
...
@@ -2752,7 +2752,7 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
for
(
int32_t
i
=
0
;
i
<
pCtx
->
tagInfo
.
numOfTagCols
;
++
i
)
{
pData
[
i
]
=
pCtx
->
tagInfo
.
pTagCtxList
[
i
]
->
pOutput
;
}
for
(
int32_t
i
=
0
;
i
<
len
;
++
i
,
output
+=
step
)
{
int16_t
offset
=
0
;
for
(
int32_t
j
=
0
;
j
<
pCtx
->
tagInfo
.
numOfTagCols
;
++
j
)
{
...
...
@@ -2761,7 +2761,7 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
pData
[
j
]
+=
pCtx
->
tagInfo
.
pTagCtxList
[
j
]
->
outputBytes
;
}
}
tfree
(
pData
);
}
...
...
@@ -2792,18 +2792,18 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
}
STopBotInfo
*
pTopBotInfo
=
getOutputInfo
(
pCtx
);
// required number of results are not reached, continue load data block
if
(
pTopBotInfo
->
num
<
pCtx
->
param
[
0
].
i64
)
{
return
true
;
}
if
((
void
*
)
pTopBotInfo
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pTopBotInfo
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
))
{
buildTopBotStruct
(
pTopBotInfo
,
pCtx
);
}
tValuePair
**
pRes
=
(
tValuePair
**
)
pTopBotInfo
->
res
;
if
(
pCtx
->
functionId
==
TSDB_FUNC_TOP
)
{
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
...
...
@@ -2845,7 +2845,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
STopBotInfo
*
pInfo
=
getOutputInfo
(
pCtx
);
buildTopBotStruct
(
pInfo
,
pCtx
);
return
true
;
...
...
@@ -2860,27 +2860,27 @@ static void top_function(SQLFunctionCtx *pCtx) {
if
((
void
*
)
pRes
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pRes
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
))
{
buildTopBotStruct
(
pRes
,
pCtx
);
}
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
char
*
data
=
GET_INPUT_DATA
(
pCtx
,
i
);
if
(
pCtx
->
hasNull
&&
isNull
(
data
,
pCtx
->
inputType
))
{
continue
;
}
notNullElems
++
;
// NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
do_top_function_add
(
pRes
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
data
,
ts
,
pCtx
->
inputType
,
&
pCtx
->
tagInfo
,
NULL
,
0
);
}
if
(
!
pCtx
->
hasNull
)
{
assert
(
pCtx
->
size
==
notNullElems
);
}
// treat the result as only one result
SET_VAL
(
pCtx
,
notNullElems
,
1
);
if
(
notNullElems
>
0
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
...
...
@@ -2889,21 +2889,21 @@ static void top_function(SQLFunctionCtx *pCtx) {
static
void
top_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
STopBotInfo
*
pInput
=
(
STopBotInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
// construct the input data struct from binary data
buildTopBotStruct
(
pInput
,
pCtx
);
STopBotInfo
*
pOutput
=
getOutputInfo
(
pCtx
);
// the intermediate result is binary, we only use the output data type
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
int16_t
type
=
(
pCtx
->
outputType
==
TSDB_DATA_TYPE_FLOAT
)
?
TSDB_DATA_TYPE_DOUBLE
:
pCtx
->
outputType
;
do_top_function_add
(
pOutput
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
&
pInput
->
res
[
i
]
->
v
.
i64
,
pInput
->
res
[
i
]
->
timestamp
,
type
,
&
pCtx
->
tagInfo
,
pInput
->
res
[
i
]
->
pTags
,
pCtx
->
currentStage
);
}
SET_VAL
(
pCtx
,
pInput
->
num
,
pOutput
->
num
);
if
(
pOutput
->
num
>
0
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
...
...
@@ -2912,9 +2912,9 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
static
void
bottom_function
(
SQLFunctionCtx
*
pCtx
)
{
int32_t
notNullElems
=
0
;
STopBotInfo
*
pRes
=
getOutputInfo
(
pCtx
);
if
((
void
*
)
pRes
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pRes
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
))
{
buildTopBotStruct
(
pRes
,
pCtx
);
}
...
...
@@ -2930,14 +2930,14 @@ static void bottom_function(SQLFunctionCtx *pCtx) {
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
do_bottom_function_add
(
pRes
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
data
,
ts
,
pCtx
->
inputType
,
&
pCtx
->
tagInfo
,
NULL
,
0
);
}
if
(
!
pCtx
->
hasNull
)
{
assert
(
pCtx
->
size
==
notNullElems
);
}
// treat the result as only one result
SET_VAL
(
pCtx
,
notNullElems
,
1
);
if
(
notNullElems
>
0
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
...
...
@@ -2946,12 +2946,12 @@ static void bottom_function(SQLFunctionCtx *pCtx) {
static
void
bottom_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
STopBotInfo
*
pInput
=
(
STopBotInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
// construct the input data struct from binary data
buildTopBotStruct
(
pInput
,
pCtx
);
STopBotInfo
*
pOutput
=
getOutputInfo
(
pCtx
);
// the intermediate result is binary, we only use the output data type
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
int16_t
type
=
(
pCtx
->
outputType
==
TSDB_DATA_TYPE_FLOAT
)
?
TSDB_DATA_TYPE_DOUBLE
:
pCtx
->
outputType
;
...
...
@@ -2960,7 +2960,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) {
}
SET_VAL
(
pCtx
,
pInput
->
num
,
pOutput
->
num
);
if
(
pOutput
->
num
>
0
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
...
...
@@ -2969,17 +2969,17 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) {
static
void
top_bottom_func_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// data in temporary list is less than the required number of results, not enough qualified number of results
STopBotInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pRes
->
num
==
0
)
{
// no result
assert
(
pResInfo
->
hasResult
!=
DATA_SET_FLAG
);
// TODO:
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
pRes
->
num
;
tValuePair
**
tvp
=
pRes
->
res
;
// user specify the order of output by sort the result according to timestamp
if
(
pCtx
->
param
[
2
].
i64
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
__compar_fn_t
comparator
=
(
pCtx
->
param
[
3
].
i64
==
TSDB_ORDER_ASC
)
?
resAscComparFn
:
resDescComparFn
;
...
...
@@ -2988,10 +2988,10 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
__compar_fn_t
comparator
=
(
pCtx
->
param
[
3
].
i64
==
TSDB_ORDER_ASC
)
?
resDataAscComparFn
:
resDataDescComparFn
;
qsort
(
tvp
,
(
size_t
)
pResInfo
->
numOfRes
,
POINTER_BYTES
,
comparator
);
}
GET_TRUE_DATA_TYPE
();
copyTopBotRes
(
pCtx
,
type
);
doFinalizer
(
pCtx
);
}
...
...
@@ -3012,7 +3012,7 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
static
void
percentile_function
(
SQLFunctionCtx
*
pCtx
)
{
int32_t
notNullElems
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
...
...
@@ -3022,7 +3022,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
// all data are null, set it completed
if
(
pInfo
->
numOfElems
==
0
)
{
pResInfo
->
complete
=
true
;
return
;
}
else
{
pInfo
->
pMemBucket
=
tMemBucketCreate
(
pCtx
->
inputBytes
,
pCtx
->
inputType
,
pInfo
->
minval
,
pInfo
->
maxval
);
...
...
@@ -3086,18 +3086,18 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
hasNull
&&
isNull
(
data
,
pCtx
->
inputType
))
{
continue
;
}
notNullElems
+=
1
;
tMemBucketPut
(
pInfo
->
pMemBucket
,
data
,
1
);
}
SET_VAL
(
pCtx
,
notNullElems
,
1
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
static
void
percentile_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
double
v
=
pCtx
->
param
[
0
].
nType
==
TSDB_DATA_TYPE_INT
?
pCtx
->
param
[
0
].
i64
:
pCtx
->
param
[
0
].
dKey
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
ppInfo
=
(
SPercentileInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
...
...
@@ -3109,7 +3109,7 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
}
else
{
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
getPercentile
(
pMemBucket
,
v
));
}
tMemBucketDestroy
(
pMemBucket
);
doFinalizer
(
pCtx
);
}
...
...
@@ -3129,7 +3129,7 @@ static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo)
if
(
!
function_setup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
// new TDigest
SAPercentileInfo
*
pInfo
=
getOutputInfo
(
pCtx
);
char
*
tmp
=
(
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
);
...
...
@@ -3156,7 +3156,7 @@ static void tdigest_do(SQLFunctionCtx *pCtx) {
}
notNullElems
+=
1
;
double
v
=
0
;
// value
double
v
=
0
;
// value
long
long
w
=
1
;
// weigth
GET_TYPED_DATA
(
v
,
double
,
pCtx
->
inputType
,
data
);
tdigestAdd
(
pAPerc
->
pTDigest
,
v
,
w
);
...
...
@@ -3190,7 +3190,7 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) {
}
else
{
tdigestMerge
(
pOutput
->
pTDigest
,
pInput
->
pTDigest
);
}
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
SET_VAL
(
pCtx
,
1
,
1
);
...
...
@@ -3243,10 +3243,10 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
if
(
!
function_setup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
SAPercentileInfo
*
pInfo
=
getOutputInfo
(
pCtx
);
buildHistogramInfo
(
pInfo
);
char
*
tmp
=
(
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
);
pInfo
->
pHisto
=
tHistogramCreateFrom
(
tmp
,
MAX_HISTOGRAM_BIN
);
return
true
;
...
...
@@ -3259,32 +3259,32 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
}
int32_t
notNullElems
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pInfo
=
getOutputInfo
(
pCtx
);
buildHistogramInfo
(
pInfo
);
assert
(
pInfo
->
pHisto
->
elems
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
char
*
data
=
GET_INPUT_DATA
(
pCtx
,
i
);
if
(
pCtx
->
hasNull
&&
isNull
(
data
,
pCtx
->
inputType
))
{
continue
;
}
notNullElems
+=
1
;
double
v
=
0
;
GET_TYPED_DATA
(
v
,
double
,
pCtx
->
inputType
,
data
);
tHistogramAdd
(
&
pInfo
->
pHisto
,
v
);
}
if
(
!
pCtx
->
hasNull
)
{
assert
(
pCtx
->
size
==
notNullElems
);
}
SET_VAL
(
pCtx
,
notNullElems
,
1
);
if
(
notNullElems
>
0
)
{
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
...
...
@@ -3297,23 +3297,23 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
}
SAPercentileInfo
*
pInput
=
(
SAPercentileInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
pInput
->
pHisto
=
(
SHistogramInfo
*
)
((
char
*
)
pInput
+
sizeof
(
SAPercentileInfo
));
pInput
->
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pInput
->
pHisto
+
sizeof
(
SHistogramInfo
));
if
(
pInput
->
pHisto
->
numOfElems
<=
0
)
{
return
;
}
SAPercentileInfo
*
pOutput
=
getOutputInfo
(
pCtx
);
buildHistogramInfo
(
pOutput
);
SHistogramInfo
*
pHisto
=
pOutput
->
pHisto
;
if
(
pHisto
->
numOfElems
<=
0
)
{
memcpy
(
pHisto
,
pInput
->
pHisto
,
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
(
MAX_HISTOGRAM_BIN
+
1
));
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
}
else
{
//TODO(dengyihao): avoid memcpy
//TODO(dengyihao): avoid memcpy
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
SHistogramInfo
*
pRes
=
tHistogramMerge
(
pHisto
,
pInput
->
pHisto
,
MAX_HISTOGRAM_BIN
);
memcpy
(
pHisto
,
pRes
,
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
MAX_HISTOGRAM_BIN
);
...
...
@@ -3333,17 +3333,17 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
}
double
v
=
(
pCtx
->
param
[
0
].
nType
==
TSDB_DATA_TYPE_INT
)
?
pCtx
->
param
[
0
].
i64
:
pCtx
->
param
[
0
].
dKey
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pOutput
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
if
(
pResInfo
->
hasResult
==
DATA_SET_FLAG
)
{
// check for null
assert
(
pOutput
->
pHisto
->
numOfElems
>
0
);
double
ratio
[]
=
{
v
};
double
*
res
=
tHistogramUniform
(
pOutput
->
pHisto
,
ratio
,
1
);
memcpy
(
pCtx
->
pOutput
,
res
,
sizeof
(
double
));
free
(
res
);
}
else
{
...
...
@@ -3353,7 +3353,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
}
else
{
if
(
pOutput
->
pHisto
->
numOfElems
>
0
)
{
double
ratio
[]
=
{
v
};
double
*
res
=
tHistogramUniform
(
pOutput
->
pHisto
,
ratio
,
1
);
memcpy
(
pCtx
->
pOutput
,
res
,
sizeof
(
double
));
free
(
res
);
...
...
@@ -3362,7 +3362,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
return
;
}
}
doFinalizer
(
pCtx
);
}
...
...
@@ -3373,7 +3373,7 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo
}
SLeastsquaresInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
// 2*3 matrix
pInfo
->
startVal
=
pCtx
->
param
[
0
].
dKey
;
return
true
;
...
...
@@ -3400,12 +3400,12 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo
static
void
leastsquares_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquaresInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
double
(
*
param
)[
3
]
=
pInfo
->
mat
;
double
x
=
pInfo
->
startVal
;
void
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
int32_t
numOfElem
=
0
;
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_INT
:
{
...
...
@@ -3415,12 +3415,12 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
p
,
pCtx
->
inputType
))
{
continue
;
}
param
[
0
][
0
]
+=
x
*
x
;
param
[
0
][
1
]
+=
x
;
param
[
0
][
2
]
+=
x
*
p
[
i
];
param
[
1
][
2
]
+=
p
[
i
];
x
+=
pCtx
->
param
[
1
].
dKey
;
numOfElem
++
;
}
...
...
@@ -3472,14 +3472,14 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) {
break
;
}
}
pInfo
->
startVal
=
x
;
pInfo
->
num
+=
numOfElem
;
if
(
pInfo
->
num
>
0
)
{
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
SET_VAL
(
pCtx
,
numOfElem
,
1
);
}
...
...
@@ -3487,30 +3487,30 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
// no data in query
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquaresInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pInfo
->
num
==
0
)
{
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
return
;
}
double
(
*
param
)[
3
]
=
pInfo
->
mat
;
param
[
1
][
1
]
=
(
double
)
pInfo
->
num
;
param
[
1
][
0
]
=
param
[
0
][
1
];
param
[
0
][
0
]
-=
param
[
1
][
0
]
*
(
param
[
0
][
1
]
/
param
[
1
][
1
]);
param
[
0
][
2
]
-=
param
[
1
][
2
]
*
(
param
[
0
][
1
]
/
param
[
1
][
1
]);
param
[
0
][
1
]
=
0
;
param
[
1
][
2
]
-=
param
[
0
][
2
]
*
(
param
[
1
][
0
]
/
param
[
0
][
0
]);
param
[
1
][
0
]
=
0
;
param
[
0
][
2
]
/=
param
[
0
][
0
];
param
[
1
][
2
]
/=
param
[
1
][
1
];
int32_t
maxOutputSize
=
TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE
-
VARSTR_HEADER_SIZE
;
size_t
n
=
snprintf
(
varDataVal
(
pCtx
->
pOutput
),
maxOutputSize
,
"{slop:%.6lf, intercept:%.6lf}"
,
param
[
0
][
2
],
param
[
1
][
2
]);
varDataSetLen
(
pCtx
->
pOutput
,
n
);
doFinalizer
(
pCtx
);
}
...
...
@@ -3563,7 +3563,7 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
*/
static
void
tag_project_function
(
SQLFunctionCtx
*
pCtx
)
{
INC_INIT_VAL
(
pCtx
,
pCtx
->
size
);
assert
(
pCtx
->
inputBytes
==
pCtx
->
outputBytes
);
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
pOutput
,
pCtx
->
outputType
,
true
);
...
...
@@ -3598,7 +3598,7 @@ static void tag_function(SQLFunctionCtx *pCtx) {
static
void
copy_function
(
SQLFunctionCtx
*
pCtx
)
{
SET_VAL
(
pCtx
,
pCtx
->
size
,
1
);
char
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
assignVal
(
pCtx
->
pOutput
,
pData
,
pCtx
->
inputBytes
,
pCtx
->
inputType
);
}
...
...
@@ -3669,7 +3669,7 @@ static void row_copy_function(SQLFunctionCtx *pCtx) {
static
void
full_copy_function
(
SQLFunctionCtx
*
pCtx
)
{
copy_function
(
pCtx
);
for
(
int
t
=
0
;
t
<
pCtx
->
tagInfo
.
numOfTagCols
;
++
t
)
{
SQLFunctionCtx
*
tagCtx
=
pCtx
->
tagInfo
.
pTagCtxList
[
t
];
if
(
tagCtx
->
functionId
==
TSDB_FUNC_TAG_DUMMY
)
{
...
...
@@ -3683,7 +3683,7 @@ static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
SDiffFuncInfo
*
pDiffInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pDiffInfo
->
valueAssigned
=
false
;
pDiffInfo
->
i64Prev
=
0
;
...
...
@@ -3998,10 +3998,10 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue
;
}
if
(
pDiffInfo
->
valueAssigned
)
{
if
(
pDiffInfo
->
valueAssigned
)
{
float
diff
=
(
float
)(
pData
[
i
]
-
pDiffInfo
->
d64Prev
);
if
(
diff
>=
0
||
!
pDiffInfo
->
ignoreNegative
)
{
*
pOutput
=
diff
;
*
pOutput
=
diff
;
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
...
...
@@ -4085,7 +4085,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
char
*
getScalarExprColumnData
(
void
*
param
,
const
char
*
name
,
int32_t
colId
)
{
SScalarExprSupport
*
pSupport
=
(
SScalarExprSupport
*
)
param
;
int32_t
idx
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pSupport
->
numOfCols
;
++
i
)
{
if
(
colId
==
pSupport
->
colList
[
i
].
colId
)
{
...
...
@@ -4093,7 +4093,7 @@ char *getScalarExprColumnData(void *param, const char* name, int32_t colId) {
break
;
}
}
assert
(
idx
>=
0
);
return
pSupport
->
data
[
idx
]
+
pSupport
->
offset
*
pSupport
->
colList
[
idx
].
bytes
;
}
...
...
@@ -4128,9 +4128,9 @@ static bool spread_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
SSpreadInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
// this is the server-side setup function in client-side, the secondary merge do not need this procedure
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
pCtx
->
param
[
0
].
dKey
=
DBL_MAX
;
...
...
@@ -4139,21 +4139,21 @@ static bool spread_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
pInfo
->
min
=
DBL_MAX
;
pInfo
->
max
=
-
DBL_MAX
;
}
return
true
;
}
static
void
spread_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SSpreadInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
int32_t
numOfElems
=
0
;
// todo : opt with pre-calculated result
// column missing cause the hasNull to be true
if
(
pCtx
->
preAggVals
.
isSet
)
{
numOfElems
=
pCtx
->
size
-
pCtx
->
preAggVals
.
statis
.
numOfNull
;
// all data are null in current data block, ignore current data block
if
(
numOfElems
==
0
)
{
goto
_spread_over
;
...
...
@@ -4172,18 +4172,18 @@ static void spread_function(SQLFunctionCtx *pCtx) {
if
(
pInfo
->
min
>
GET_DOUBLE_VAL
((
const
char
*
)
&
(
pCtx
->
preAggVals
.
statis
.
min
)))
{
pInfo
->
min
=
GET_DOUBLE_VAL
((
const
char
*
)
&
(
pCtx
->
preAggVals
.
statis
.
min
));
}
if
(
pInfo
->
max
<
GET_DOUBLE_VAL
((
const
char
*
)
&
(
pCtx
->
preAggVals
.
statis
.
max
)))
{
pInfo
->
max
=
GET_DOUBLE_VAL
((
const
char
*
)
&
(
pCtx
->
preAggVals
.
statis
.
max
));
}
}
goto
_spread_over
;
}
void
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
numOfElems
=
0
;
if
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_TINYINT
)
{
LIST_MINMAX_N
(
pCtx
,
pInfo
->
min
,
pInfo
->
max
,
pCtx
->
size
,
pData
,
int8_t
,
pCtx
->
inputType
,
numOfElems
);
}
else
if
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_SMALLINT
)
{
...
...
@@ -4205,19 +4205,19 @@ static void spread_function(SQLFunctionCtx *pCtx) {
}
else
if
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_UBIGINT
)
{
LIST_MINMAX_N
(
pCtx
,
pInfo
->
min
,
pInfo
->
max
,
pCtx
->
size
,
pData
,
uint64_t
,
pCtx
->
inputType
,
numOfElems
);
}
if
(
!
pCtx
->
hasNull
)
{
assert
(
pCtx
->
size
==
numOfElems
);
}
_spread_over:
SET_VAL
(
pCtx
,
numOfElems
,
1
);
if
(
numOfElems
>
0
)
{
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
pInfo
->
hasResult
=
DATA_SET_FLAG
;
}
// keep the data into the final output buffer for super table query since this execution may be the last one
if
(
pCtx
->
stableQuery
)
{
memcpy
(
pCtx
->
pOutput
,
GET_ROWCELL_INTERBUF
(
pResInfo
),
sizeof
(
SSpreadInfo
));
...
...
@@ -4233,15 +4233,15 @@ void spread_func_merge(SQLFunctionCtx *pCtx) {
if
(
pData
->
hasResult
!=
DATA_SET_FLAG
)
{
return
;
}
if
(
pCtx
->
param
[
0
].
dKey
>
pData
->
min
)
{
pCtx
->
param
[
0
].
dKey
=
pData
->
min
;
}
if
(
pCtx
->
param
[
3
].
dKey
<
pData
->
max
)
{
pCtx
->
param
[
3
].
dKey
=
pData
->
max
;
}
GET_RES_INFO
(
pCtx
)
->
hasResult
=
DATA_SET_FLAG
;
}
...
...
@@ -4254,25 +4254,25 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
assert
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_BINARY
);
if
(
pResInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
return
;
}
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
pCtx
->
param
[
3
].
dKey
-
pCtx
->
param
[
0
].
dKey
);
}
else
{
assert
(
IS_NUMERIC_TYPE
(
pCtx
->
inputType
)
||
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_TIMESTAMP
));
SSpreadInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
if
(
pInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
return
;
}
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
pInfo
->
max
-
pInfo
->
min
);
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
// todo add test case
doFinalizer
(
pCtx
);
}
...
...
@@ -4355,7 +4355,7 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4367,14 +4367,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
#ifndef _TD_NINGSI_60
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
val
[
i
]};
#else
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4386,14 +4386,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
#ifndef _TD_NINGSI_60
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
val
[
i
]};
#else
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4405,14 +4405,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
#ifndef _TD_NINGSI_60
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
(
double
)
val
[
i
]};
#else
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
(
double
)
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4424,14 +4424,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
#ifndef _TD_NINGSI_60
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
val
[
i
]};
#else
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
(
double
)
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4443,14 +4443,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
#ifndef _TD_NINGSI_60
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
val
[
i
]};
#else
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4469,7 +4469,7 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4488,7 +4488,7 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4507,7 +4507,7 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4519,14 +4519,14 @@ static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t idx, int32_t size
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
#ifndef _TD_NINGSI_60
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
(
double
)
val
[
i
]};
#else
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
(
double
)
val
[
i
];
#endif
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
...
...
@@ -4550,7 +4550,7 @@ static void twa_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STwaInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
// skip null value
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pCtx
->
order
);
int32_t
i
=
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
?
0
:
(
pCtx
->
size
-
1
);
...
...
@@ -4564,11 +4564,11 @@ static void twa_function(SQLFunctionCtx *pCtx) {
}
SET_VAL
(
pCtx
,
notNullElems
,
1
);
if
(
notNullElems
>
0
)
{
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
if
(
pCtx
->
stableQuery
)
{
memcpy
(
pCtx
->
pOutput
,
pInfo
,
sizeof
(
STwaInfo
));
}
...
...
@@ -4582,14 +4582,14 @@ static void twa_function(SQLFunctionCtx *pCtx) {
void
twa_function_copy
(
SQLFunctionCtx
*
pCtx
)
{
assert
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_BINARY
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
memcpy
(
GET_ROWCELL_INTERBUF
(
pResInfo
),
pCtx
->
pInput
,
(
size_t
)
pCtx
->
inputBytes
);
pResInfo
->
hasResult
=
((
STwaInfo
*
)
pCtx
->
pInput
)
->
hasResult
;
}
void
twa_function_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STwaInfo
*
pInfo
=
(
STwaInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
setNull
(
pCtx
->
pOutput
,
TSDB_DATA_TYPE_DOUBLE
,
sizeof
(
double
));
...
...
@@ -4602,7 +4602,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
}
else
{
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
pInfo
->
dOutput
/
(
pInfo
->
win
.
ekey
-
pInfo
->
win
.
skey
));
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
doFinalizer
(
pCtx
);
}
...
...
@@ -4613,27 +4613,27 @@ static void interp_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
start
.
key
==
pCtx
->
startTs
)
{
assert
(
pCtx
->
start
.
key
!=
INT64_MIN
);
COPY_TYPED_DATA
(
pCtx
->
pOutput
,
pCtx
->
inputType
,
&
pCtx
->
start
.
val
);
goto
interp_success_exit
;
goto
interp_success_exit
;
}
else
if
(
pCtx
->
end
.
key
==
pCtx
->
startTs
&&
pCtx
->
end
.
key
!=
INT64_MIN
&&
fillType
==
TSDB_FILL_NEXT
)
{
COPY_TYPED_DATA
(
pCtx
->
pOutput
,
pCtx
->
inputType
,
&
pCtx
->
end
.
val
);
goto
interp_success_exit
;
goto
interp_success_exit
;
}
switch
(
fillType
)
{
case
TSDB_FILL_NULL
:
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
break
;
case
TSDB_FILL_SET_VALUE
:
tVariantDump
(
&
pCtx
->
param
[
1
],
pCtx
->
pOutput
,
pCtx
->
inputType
,
true
);
break
;
case
TSDB_FILL_LINEAR
:
if
(
pCtx
->
start
.
key
==
INT64_MIN
||
pCtx
->
start
.
key
>
pCtx
->
startTs
if
(
pCtx
->
start
.
key
==
INT64_MIN
||
pCtx
->
start
.
key
>
pCtx
->
startTs
||
pCtx
->
end
.
key
==
INT64_MIN
||
pCtx
->
end
.
key
<
pCtx
->
startTs
)
{
goto
interp_exit
;
}
...
...
@@ -4641,7 +4641,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
double
v1
=
-
1
,
v2
=
-
1
;
GET_TYPED_DATA
(
v1
,
double
,
pCtx
->
inputType
,
&
pCtx
->
start
.
val
);
GET_TYPED_DATA
(
v2
,
double
,
pCtx
->
inputType
,
&
pCtx
->
end
.
val
);
SPoint
point1
=
{.
key
=
pCtx
->
start
.
key
,
.
val
=
&
v1
};
SPoint
point2
=
{.
key
=
pCtx
->
end
.
key
,
.
val
=
&
v2
};
SPoint
point
=
{.
key
=
pCtx
->
startTs
,
.
val
=
pCtx
->
pOutput
};
...
...
@@ -4654,7 +4654,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
taosGetLinearInterpolationVal
(
&
point
,
pCtx
->
outputType
,
&
point1
,
&
point2
,
TSDB_DATA_TYPE_DOUBLE
,
&
exceedMax
,
&
exceedMin
);
if
(
exceedMax
||
exceedMin
)
{
__compar_fn_t
func
=
getComparFunc
((
int32_t
)
pCtx
->
inputType
,
0
);
if
(
func
(
&
pCtx
->
start
.
val
,
&
pCtx
->
end
.
val
)
<=
0
)
{
if
(
func
(
&
pCtx
->
start
.
val
,
&
pCtx
->
end
.
val
)
<=
0
)
{
COPY_TYPED_DATA
(
pCtx
->
pOutput
,
pCtx
->
inputType
,
exceedMax
?
&
pCtx
->
start
.
val
:
&
pCtx
->
end
.
val
);
}
else
{
COPY_TYPED_DATA
(
pCtx
->
pOutput
,
pCtx
->
inputType
,
exceedMax
?
&
pCtx
->
end
.
val
:
&
pCtx
->
start
.
val
);
...
...
@@ -4675,7 +4675,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
end
.
key
==
INT64_MIN
||
pCtx
->
end
.
key
<
pCtx
->
startTs
)
{
goto
interp_exit
;
}
COPY_TYPED_DATA
(
pCtx
->
pOutput
,
pCtx
->
inputType
,
&
pCtx
->
end
.
val
);
break
;
...
...
@@ -4685,7 +4685,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
}
interp_success_exit:
interp_success_exit:
*
(
TSKEY
*
)
pCtx
->
ptsOutputBuf
=
pCtx
->
startTs
;
...
...
@@ -4714,9 +4714,9 @@ static bool ts_comp_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRe
static
void
ts_comp_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STSBuf
*
pTSbuf
=
((
STSCompInfo
*
)(
GET_ROWCELL_INTERBUF
(
pResInfo
)))
->
pTSBuf
;
const
char
*
input
=
GET_INPUT_DATA_LIST
(
pCtx
);
// primary ts must be existed, so no need to check its existance
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
tsBufAppend
(
pTSbuf
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
&
pCtx
->
tag
,
input
,
pCtx
->
size
*
TSDB_KEYSIZE
);
...
...
@@ -4726,17 +4726,17 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
tsBufAppend
(
pTSbuf
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
&
pCtx
->
tag
,
d
,
(
int32_t
)
TSDB_KEYSIZE
);
}
}
SET_VAL
(
pCtx
,
pCtx
->
size
,
1
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
static
void
ts_comp_finalize
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STSCompInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
STSBuf
*
pTSbuf
=
pInfo
->
pTSBuf
;
tsBufFlush
(
pTSbuf
);
qDebug
(
"total timestamp :%"
PRId64
,
pTSbuf
->
numOfTotal
);
...
...
@@ -4779,7 +4779,7 @@ static double do_calc_rate(const SRateInfo* pRateInfo, double tickPerSec) {
return
0
;
}
}
int64_t
duration
=
pRateInfo
->
lastKey
-
pRateInfo
->
firstKey
;
if
(
duration
==
0
)
{
return
0
;
...
...
@@ -4792,7 +4792,7 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
SRateInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pInfo
->
correctionValue
=
0
;
pInfo
->
firstKey
=
INT64_MIN
;
...
...
@@ -4807,51 +4807,51 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
static
void
rate_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
int32_t
notNullElems
=
0
;
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
TSKEY
*
primaryKey
=
GET_TS_LIST
(
pCtx
);
qDebug
(
"%p rate_function() size:%d, hasNull:%d"
,
pCtx
,
pCtx
->
size
,
pCtx
->
hasNull
);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
char
*
pData
=
GET_INPUT_DATA
(
pCtx
,
i
);
if
(
pCtx
->
hasNull
&&
isNull
(
pData
,
pCtx
->
inputType
))
{
qDebug
(
"%p rate_function() index of null data:%d"
,
pCtx
,
i
);
continue
;
}
notNullElems
++
;
double
v
=
0
;
GET_TYPED_DATA
(
v
,
double
,
pCtx
->
inputType
,
pData
);
if
((
INT64_MIN
==
pRateInfo
->
firstValue
)
||
(
INT64_MIN
==
pRateInfo
->
firstKey
))
{
pRateInfo
->
firstValue
=
v
;
pRateInfo
->
firstKey
=
primaryKey
[
i
];
}
if
(
INT64_MIN
==
pRateInfo
->
lastValue
)
{
pRateInfo
->
lastValue
=
v
;
}
else
if
(
v
<
pRateInfo
->
lastValue
)
{
pRateInfo
->
correctionValue
+=
pRateInfo
->
lastValue
;
}
pRateInfo
->
lastValue
=
v
;
pRateInfo
->
lastKey
=
primaryKey
[
i
];
}
if
(
!
pCtx
->
hasNull
)
{
assert
(
pCtx
->
size
==
notNullElems
);
}
SET_VAL
(
pCtx
,
notNullElems
,
1
);
if
(
notNullElems
>
0
)
{
pRateInfo
->
hasResult
=
DATA_SET_FLAG
;
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
// keep the data into the final output buffer for super table query since this execution may be the last one
if
(
pCtx
->
stableQuery
)
{
memcpy
(
pCtx
->
pOutput
,
GET_ROWCELL_INTERBUF
(
pResInfo
),
sizeof
(
SRateInfo
));
...
...
@@ -4860,7 +4860,7 @@ static void rate_function(SQLFunctionCtx *pCtx) {
static
void
rate_func_copy
(
SQLFunctionCtx
*
pCtx
)
{
assert
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_BINARY
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
memcpy
(
GET_ROWCELL_INTERBUF
(
pResInfo
),
pCtx
->
pInput
,
(
size_t
)
pCtx
->
inputBytes
);
pResInfo
->
hasResult
=
((
SRateInfo
*
)
pCtx
->
pInput
)
->
hasResult
;
...
...
@@ -4874,13 +4874,13 @@ static void rate_finalizer(SQLFunctionCtx *pCtx) {
setNull
(
pCtx
->
pOutput
,
TSDB_DATA_TYPE_DOUBLE
,
sizeof
(
double
));
return
;
}
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
do_calc_rate
(
pRateInfo
,
(
double
)
TSDB_TICK_PER_SECOND
(
pCtx
->
param
[
0
].
i64
)));
// cannot set the numOfIteratedElems again since it is set during previous iteration
pResInfo
->
numOfRes
=
1
;
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
doFinalizer
(
pCtx
);
}
...
...
@@ -4896,9 +4896,9 @@ static void irate_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
hasNull
&&
isNull
(
pData
,
pCtx
->
inputType
))
{
continue
;
}
notNullElems
++
;
double
v
=
0
;
GET_TYPED_DATA
(
v
,
double
,
pCtx
->
inputType
,
pData
);
...
...
@@ -4916,24 +4916,24 @@ static void irate_function(SQLFunctionCtx *pCtx) {
pRateInfo
->
lastValue
=
v
;
pRateInfo
->
lastKey
=
primaryKey
[
i
];
continue
;
}
if
((
INT64_MIN
==
pRateInfo
->
firstKey
)
||
primaryKey
[
i
]
>
pRateInfo
->
firstKey
)
{
pRateInfo
->
firstValue
=
v
;
pRateInfo
->
firstKey
=
primaryKey
[
i
];
break
;
}
}
SET_VAL
(
pCtx
,
notNullElems
,
1
);
if
(
notNullElems
>
0
)
{
pRateInfo
->
hasResult
=
DATA_SET_FLAG
;
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
// keep the data into the final output buffer for super table query since this execution may be the last one
if
(
pCtx
->
stableQuery
)
{
memcpy
(
pCtx
->
pOutput
,
GET_ROWCELL_INTERBUF
(
pResInfo
),
sizeof
(
SRateInfo
));
...
...
@@ -5509,7 +5509,7 @@ static void elapsedFunction(SQLFunctionCtx *pCtx) {
elapsedOver:
SET_VAL
(
pCtx
,
pCtx
->
size
,
1
);
if
(
pCtx
->
size
>
0
)
{
GET_RES_INFO
(
pCtx
)
->
hasResult
=
DATA_SET_FLAG
;
pInfo
->
hasResult
=
DATA_SET_FLAG
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录