Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
6d468fe0
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6d468fe0
编写于
9月 10, 2021
作者:
A
AlexDuan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
only reserve modify code, reset other no changed code
上级
ffc9558e
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
1085 addition
and
1118 deletion
+1085
-1118
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+1085
-1118
未找到文件。
src/query/src/qAggMain.c
浏览文件 @
6d468fe0
...
...
@@ -16,25 +16,24 @@
#include "os.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tdigest.h"
#include "texpr.h"
#include "tglobal.h"
#include "tsdb.h"
#include "tdigest.h"
#include "ttype.h"
#include "tsdb.h"
#include "tglobal.h"
#include "qAggMain.h"
#include "qFill.h"
#include "qHistogram.h"
#include "qPercentile.h"
#include "qTsbuf.h"
#include "qUdf.h"
#include "queryLog.h"
#include "qUdf.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
#define GET_TS_LIST(x)
((TSKEY
*)((x)->ptsList))
#define GET_TS_LIST(x)
((TSKEY
*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define GET_TRUE_DATA_TYPE() \
...
...
@@ -167,19 +166,19 @@ typedef struct SRateInfo {
typedef
struct
SDerivInfo
{
double
prevValue
;
// previous value
TSKEY
prevTs
;
// previous timestamp
bool
ignoreNegative
;
// ignore the negative value
bool
ignoreNegative
;
// ignore the negative value
int64_t
tsWindow
;
// time window for derivative
bool
valueSet
;
// the value has been set already
}
SDerivInfo
;
int32_t
getResultDataInfo
(
int32_t
dataType
,
int32_t
dataBytes
,
int32_t
functionId
,
int32_t
param
,
int16_t
*
type
,
int16_t
*
bytes
,
int32_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
,
SUdfInfo
*
pUdfInfo
)
{
int16_t
*
bytes
,
int32_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
,
SUdfInfo
*
pUdfInfo
)
{
if
(
!
isValidDataType
(
dataType
))
{
qError
(
"Illegal data type %d or data type length %d"
,
dataType
,
dataBytes
);
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
if
(
functionId
==
TSDB_FUNC_TS
||
functionId
==
TSDB_FUNC_TS_DUMMY
||
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_DIFF
||
functionId
==
TSDB_FUNC_PRJ
||
functionId
==
TSDB_FUNC_TAGPRJ
||
functionId
==
TSDB_FUNC_TAG
||
functionId
==
TSDB_FUNC_INTERP
)
{
...
...
@@ -198,8 +197,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE
if
(
functionId
==
TSDB_FUNC_TID_TAG
)
{
// todo use struct
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
bytes
=
(
int16_t
)(
dataBytes
+
sizeof
(
int16_t
)
+
sizeof
(
int64_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
VARSTR_HEADER_SIZE
);
*
bytes
=
(
int16_t
)(
dataBytes
+
sizeof
(
int16_t
)
+
sizeof
(
int64_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
VARSTR_HEADER_SIZE
);
*
interBytes
=
0
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -326,7 +324,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
}
else
if
(
functionId
==
TSDB_FUNC_APERCT
)
{
*
type
=
TSDB_DATA_TYPE_DOUBLE
;
*
bytes
=
sizeof
(
double
);
*
interBytes
=
sizeof
(
SAPercentileInfo
)
+
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
(
MAX_HISTOGRAM_BIN
+
1
);
*
interBytes
=
sizeof
(
SAPercentileInfo
)
+
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
(
MAX_HISTOGRAM_BIN
+
1
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
functionId
==
TSDB_FUNC_TWA
)
{
*
type
=
TSDB_DATA_TYPE_DOUBLE
;
...
...
@@ -409,9 +408,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
}
// TODO use hash table
int32_t
isValidFunction
(
const
char
*
name
,
int32_t
len
)
{
for
(
int32_t
i
=
0
;
i
<=
TSDB_FUNC_BLKINFO
;
++
i
)
{
int32_t
nameLen
=
(
int32_t
)
strlen
(
aAggs
[
i
].
name
);
int32_t
isValidFunction
(
const
char
*
name
,
int32_t
len
)
{
for
(
int32_t
i
=
0
;
i
<=
TSDB_FUNC_BLKINFO
;
++
i
)
{
int32_t
nameLen
=
(
int32_t
)
strlen
(
aAggs
[
i
].
name
);
if
(
len
!=
nameLen
)
{
continue
;
}
...
...
@@ -424,7 +423,7 @@ int32_t isValidFunction(const char *name, int32_t len) {
return
-
1
;
}
static
bool
function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
static
bool
function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
if
(
pResultInfo
->
initialized
)
{
return
false
;
}
...
...
@@ -475,8 +474,7 @@ static void count_function(SQLFunctionCtx *pCtx) {
numOfElem
+=
1
;
}
}
else
{
// when counting on the primary time stamp column and no statistics data is presented, use the size value
// directly.
//when counting on the primary time stamp column and no statistics data is presented, use the size value directly.
numOfElem
=
pCtx
->
size
;
}
}
...
...
@@ -507,7 +505,7 @@ static void count_func_merge(SQLFunctionCtx *pCtx) {
* @param filterCols
* @return
*/
int32_t
countRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
int32_t
countRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
if
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
return
BLK_DATA_NO_NEEDED
;
}
else
{
...
...
@@ -515,7 +513,9 @@ int32_t countRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) {
}
}
int32_t
noDataRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
return
BLK_DATA_NO_NEEDED
;
}
int32_t
noDataRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
return
BLK_DATA_NO_NEEDED
;
}
#define LIST_ADD_N_DOUBLE_FLOAT(x, ctx, p, t, numOfElem, tsdbType) \
do { \
t *d = (t *)(p); \
...
...
@@ -523,10 +523,10 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) { re
if (((ctx)->hasNull) && isNull((char *)&(d)[i], tsdbType)) { \
continue; \
}; \
SET_DOUBLE_VAL(&(x)
, GET_DOUBLE_VAL(&(x)) + GET_FLOAT_VAL(&(d)[i]));
\
SET_DOUBLE_VAL(&(x)
, GET_DOUBLE_VAL(&(x)) + GET_FLOAT_VAL(&(d)[i]));
\
(numOfElem)++; \
} \
} while
(0)
} while(0)
#define LIST_ADD_N_DOUBLE(x, ctx, p, t, numOfElem, tsdbType) \
do { \
t *d = (t *)(p); \
...
...
@@ -534,10 +534,10 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) { re
if (((ctx)->hasNull) && isNull((char *)&(d)[i], tsdbType)) { \
continue; \
}; \
SET_DOUBLE_VAL(&(x)
, (x) + (d)[i]);
\
SET_DOUBLE_VAL(&(x)
, (x) + (d)[i]);
\
(numOfElem)++; \
} \
} while
(0)
} while(0)
#define LIST_ADD_N(x, ctx, p, t, numOfElem, tsdbType) \
do { \
...
...
@@ -549,7 +549,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) { re
(x) += (d)[i]; \
(numOfElem)++; \
} \
} while
(0)
} while(0)
#define UPDATE_DATA(ctx, left, right, num, sign, k) \
do { \
...
...
@@ -574,7 +574,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t colId) { re
if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \
continue; \
} \
TSKEY key = (ctx)->ptsList != NULL
? GET_TS_DATA(ctx, i) : 0;
\
TSKEY key = (ctx)->ptsList != NULL
? GET_TS_DATA(ctx, i):0;
\
UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \
}
...
...
@@ -600,8 +600,8 @@ static void do_sum(SQLFunctionCtx *pCtx) {
uint64_t
*
retVal
=
(
uint64_t
*
)
pCtx
->
pOutput
;
*
retVal
+=
(
uint64_t
)
pCtx
->
preAggVals
.
statis
.
sum
;
}
else
if
(
IS_FLOAT_TYPE
(
pCtx
->
inputType
))
{
double
*
retVal
=
(
double
*
)
pCtx
->
pOutput
;
SET_DOUBLE_VAL
(
retVal
,
*
retVal
+
GET_DOUBLE_VAL
((
const
char
*
)
&
(
pCtx
->
preAggVals
.
statis
.
sum
)));
double
*
retVal
=
(
double
*
)
pCtx
->
pOutput
;
SET_DOUBLE_VAL
(
retVal
,
*
retVal
+
GET_DOUBLE_VAL
((
const
char
*
)
&
(
pCtx
->
preAggVals
.
statis
.
sum
)));
}
}
else
{
// computing based on the true data block
void
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
...
...
@@ -678,7 +678,7 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) {
if
(
IS_SIGNED_NUMERIC_TYPE
(
type
))
{
*
(
int64_t
*
)
pCtx
->
pOutput
+=
pInput
->
isum
;
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
))
{
*
(
uint64_t
*
)
pCtx
->
pOutput
+=
pInput
->
usum
;
*
(
uint64_t
*
)
pCtx
->
pOutput
+=
pInput
->
usum
;
}
else
{
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
*
(
double
*
)
pCtx
->
pOutput
+
pInput
->
dsum
);
}
...
...
@@ -692,12 +692,16 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) {
}
}
static
int32_t
statisRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
return
BLK_DATA_STATIS_NEEDED
;
}
static
int32_t
statisRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
return
BLK_DATA_STATIS_NEEDED
;
}
static
int32_t
dataBlockRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
return
BLK_DATA_ALL_NEEDED
;
}
static
int32_t
dataBlockRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
return
BLK_DATA_ALL_NEEDED
;
}
// todo: if column in current data block are null, opt for this case
static
int32_t
firstFuncRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
static
int32_t
firstFuncRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
)
{
return
BLK_DATA_NO_NEEDED
;
}
...
...
@@ -710,7 +714,7 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t c
}
}
static
int32_t
lastFuncRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
static
int32_t
lastFuncRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
].
i64
)
{
return
BLK_DATA_NO_NEEDED
;
}
...
...
@@ -722,7 +726,7 @@ static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_t co
}
}
static
int32_t
firstDistFuncRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
static
int32_t
firstDistFuncRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
)
{
return
BLK_DATA_NO_NEEDED
;
}
...
...
@@ -734,7 +738,7 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32
// the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is
// the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid
SFirstLastInfo
*
pInfo
=
(
SFirstLastInfo
*
)
(
pCtx
->
pOutput
+
pCtx
->
inputBytes
);
SFirstLastInfo
*
pInfo
=
(
SFirstLastInfo
*
)
(
pCtx
->
pOutput
+
pCtx
->
inputBytes
);
if
(
pInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
return
BLK_DATA_ALL_NEEDED
;
}
else
{
// data in current block is not earlier than current result
...
...
@@ -742,7 +746,7 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32
}
}
static
int32_t
lastDistFuncRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
static
int32_t
lastDistFuncRequired
(
SQLFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
].
i64
)
{
return
BLK_DATA_NO_NEEDED
;
}
...
...
@@ -754,7 +758,7 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow *w, int32_
// the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is
// the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid
SFirstLastInfo
*
pInfo
=
(
SFirstLastInfo
*
)
(
pCtx
->
pOutput
+
pCtx
->
inputBytes
);
SFirstLastInfo
*
pInfo
=
(
SFirstLastInfo
*
)
(
pCtx
->
pOutput
+
pCtx
->
inputBytes
);
if
(
pInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
return
BLK_DATA_ALL_NEEDED
;
}
else
{
...
...
@@ -775,7 +779,7 @@ static void avg_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAvgInfo
*
pAvgInfo
=
(
SAvgInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
double
*
pVal
=
&
pAvgInfo
->
sum
;
double
*
pVal
=
&
pAvgInfo
->
sum
;
if
(
pCtx
->
preAggVals
.
isSet
)
{
// Pre-aggregation
notNullElems
=
pCtx
->
size
-
pCtx
->
preAggVals
.
statis
.
numOfNull
;
...
...
@@ -784,7 +788,7 @@ static void avg_function(SQLFunctionCtx *pCtx) {
if
(
IS_SIGNED_NUMERIC_TYPE
(
pCtx
->
inputType
))
{
*
pVal
+=
pCtx
->
preAggVals
.
statis
.
sum
;
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
pCtx
->
inputType
))
{
*
pVal
+=
(
uint64_t
)
pCtx
->
preAggVals
.
statis
.
sum
;
*
pVal
+=
(
uint64_t
)
pCtx
->
preAggVals
.
statis
.
sum
;
}
else
if
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_DOUBLE
||
pCtx
->
inputType
==
TSDB_DATA_TYPE_FLOAT
)
{
*
pVal
+=
GET_DOUBLE_VAL
((
const
char
*
)
&
(
pCtx
->
preAggVals
.
statis
.
sum
));
}
...
...
@@ -834,8 +838,8 @@ static void avg_function(SQLFunctionCtx *pCtx) {
static
void
avg_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
double
*
sum
=
(
double
*
)
pCtx
->
pOutput
;
char
*
input
=
GET_INPUT_DATA_LIST
(
pCtx
);
double
*
sum
=
(
double
*
)
pCtx
->
pOutput
;
char
*
input
=
GET_INPUT_DATA_LIST
(
pCtx
);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
,
input
+=
pCtx
->
inputBytes
)
{
SAvgInfo
*
pInput
=
(
SAvgInfo
*
)
input
;
...
...
@@ -864,7 +868,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
return
;
}
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
(
*
(
double
*
)
pCtx
->
pOutput
)
/
*
(
int64_t
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
));
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,(
*
(
double
*
)
pCtx
->
pOutput
)
/
*
(
int64_t
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
));
}
else
{
// this is the secondary merge, only in the secondary merge, the input type is TSDB_DATA_TYPE_BINARY
assert
(
IS_NUMERIC_TYPE
(
pCtx
->
inputType
));
SAvgInfo
*
pAvgInfo
=
(
SAvgInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
...
...
@@ -894,7 +898,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
return
;
}
void
*
tval
=
NULL
;
void
*
tval
=
NULL
;
int16_t
index
=
0
;
if
(
isMin
)
{
...
...
@@ -913,9 +917,9 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
*
* The following codes of 3 lines will be removed later.
*/
// if (index < 0 || index >= pCtx->size + pCtx->startOffset) {
// index = 0;
// }
// if (index < 0 || index >= pCtx->size + pCtx->startOffset) {
// index = 0;
// }
// the index is the original position, not the relative position
key
=
pCtx
->
ptsList
[
index
];
...
...
@@ -984,7 +988,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
return
;
}
void
*
p
=
GET_INPUT_DATA_LIST
(
pCtx
);
void
*
p
=
GET_INPUT_DATA_LIST
(
pCtx
);
TSKEY
*
tsList
=
GET_TS_LIST
(
pCtx
);
*
notNullElems
=
0
;
...
...
@@ -996,10 +1000,10 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
TYPED_LOOPCHECK_N
(
int16_t
,
pOutput
,
p
,
pCtx
,
pCtx
->
inputType
,
isMin
,
*
notNullElems
);
}
else
if
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_INT
)
{
int32_t
*
pData
=
p
;
int32_t
*
retVal
=
(
int32_t
*
)
pOutput
;
int32_t
*
retVal
=
(
int32_t
*
)
pOutput
;
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -1035,7 +1039,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
}
}
static
bool
min_func_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
static
bool
min_func_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResultInfo
))
{
return
false
;
// not initialized since it has been initialized
}
...
...
@@ -1047,7 +1051,7 @@ static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo
*
((
int8_t
*
)
pCtx
->
pOutput
)
=
INT8_MAX
;
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
*
(
uint8_t
*
)
pCtx
->
pOutput
=
UINT8_MAX
;
*
(
uint8_t
*
)
pCtx
->
pOutput
=
UINT8_MAX
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
*
((
int16_t
*
)
pCtx
->
pOutput
)
=
INT16_MAX
;
...
...
@@ -1080,7 +1084,7 @@ static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo
return
true
;
}
static
bool
max_func_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
static
bool
max_func_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResultInfo
))
{
return
false
;
// not initialized since it has been initialized
}
...
...
@@ -1280,7 +1284,7 @@ static void max_func_merge(SQLFunctionCtx *pCtx) {
static
void
stddev_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SStddevInfo
*
pStd
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SStddevInfo
*
pStd
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pCtx
->
currentStage
==
REPEAT_SCAN
&&
pStd
->
stage
==
0
)
{
pStd
->
stage
++
;
...
...
@@ -1304,13 +1308,13 @@ static void stddev_function(SQLFunctionCtx *pCtx) {
double
*
retVal
=
&
pStd
->
res
;
double
avg
=
pStd
->
avg
;
void
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
void
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
int32_t
num
=
0
;
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_INT
:
{
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
(
&
((
int32_t
*
)
pData
)[
i
]),
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
(
&
((
int32_t
*
)
pData
)[
i
]),
pCtx
->
inputType
))
{
continue
;
}
num
+=
1
;
...
...
@@ -1377,14 +1381,14 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) {
}
//////////////////////////////////////////////////////////////////////////////////////
int32_t
tsCompare
(
const
void
*
p1
,
const
void
*
p2
)
{
TSKEY
k
=
*
(
TSKEY
*
)
p1
;
SResPair
*
pair
=
(
SResPair
*
)
p2
;
int32_t
tsCompare
(
const
void
*
p1
,
const
void
*
p2
)
{
TSKEY
k
=
*
(
TSKEY
*
)
p1
;
SResPair
*
pair
=
(
SResPair
*
)
p2
;
if
(
k
==
pair
->
key
)
{
return
0
;
}
else
{
return
k
<
pair
->
key
?
-
1
:
1
;
return
k
<
pair
->
key
?
-
1
:
1
;
}
}
...
...
@@ -1395,21 +1399,21 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
double
*
retVal
=
&
pStd
->
res
;
// all data are null, no need to proceed
SArray
*
resList
=
(
SArray
*
)
pCtx
->
param
[
0
].
pz
;
SArray
*
resList
=
(
SArray
*
)
pCtx
->
param
[
0
].
pz
;
if
(
resList
==
NULL
)
{
return
;
}
// find the correct group average results according to the tag value
int32_t
len
=
(
int32_t
)
taosArrayGetSize
(
resList
);
int32_t
len
=
(
int32_t
)
taosArrayGetSize
(
resList
);
assert
(
len
>
0
);
double
avg
=
0
;
if
(
len
==
1
)
{
SResPair
*
p
=
taosArrayGet
(
resList
,
0
);
SResPair
*
p
=
taosArrayGet
(
resList
,
0
);
avg
=
p
->
avg
;
}
else
{
// todo opt performance by using iterator since the timestamp lsit is matched with the output result
SResPair
*
p
=
bsearch
(
&
pCtx
->
startTs
,
resList
->
pData
,
len
,
sizeof
(
SResPair
),
tsCompare
);
SResPair
*
p
=
bsearch
(
&
pCtx
->
startTs
,
resList
->
pData
,
len
,
sizeof
(
SResPair
),
tsCompare
);
if
(
p
==
NULL
)
{
return
;
}
...
...
@@ -1417,13 +1421,13 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
avg
=
p
->
avg
;
}
void
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
void
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
int32_t
num
=
0
;
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_INT
:
{
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
(
&
((
int32_t
*
)
pData
)[
i
]),
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
(
&
((
int32_t
*
)
pData
)[
i
]),
pCtx
->
inputType
))
{
continue
;
}
num
+=
1
;
...
...
@@ -1480,7 +1484,7 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
static
void
stddev_dst_merge
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SStddevdstInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SStddevdstInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
char
*
input
=
GET_INPUT_DATA_LIST
(
pCtx
);
...
...
@@ -1510,7 +1514,7 @@ static void stddev_dst_finalizer(SQLFunctionCtx *pCtx) {
}
//////////////////////////////////////////////////////////////////////////////////////
static
bool
first_last_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
first_last_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
...
...
@@ -1578,7 +1582,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
* 1. data block that are not loaded
* 2. scan data files in desc order
*/
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
/* || pCtx->preAggVals.dataBlockLoaded == false*/
)
{
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
/* || pCtx->preAggVals.dataBlockLoaded == false*/
)
{
return
;
}
...
...
@@ -1607,7 +1611,7 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
assert
(
pCtx
->
stableQuery
);
char
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
SFirstLastInfo
*
pInput
=
(
SFirstLastInfo
*
)
(
pData
+
pCtx
->
outputBytes
);
SFirstLastInfo
*
pInput
=
(
SFirstLastInfo
*
)
(
pData
+
pCtx
->
outputBytes
);
if
(
pInput
->
hasResult
!=
DATA_SET_FLAG
)
{
return
;
}
...
...
@@ -1638,10 +1642,11 @@ static void last_function(SQLFunctionCtx *pCtx) {
return
;
}
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
int32_t
notNullElems
=
0
;
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
)
{
for
(
int32_t
i
=
pCtx
->
size
-
1
;
i
>=
0
;
--
i
)
{
char
*
data
=
GET_INPUT_DATA
(
pCtx
,
i
);
if
(
pCtx
->
hasNull
&&
isNull
(
data
,
pCtx
->
inputType
)
&&
(
!
pCtx
->
requireNull
))
{
...
...
@@ -1667,12 +1672,12 @@ static void last_function(SQLFunctionCtx *pCtx) {
TSKEY
ts
=
pCtx
->
ptsList
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pResInfo
->
hasResult
!=
DATA_SET_FLAG
||
(
*
(
TSKEY
*
)
buf
)
<
ts
)
{
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pResInfo
->
hasResult
!=
DATA_SET_FLAG
||
(
*
(
TSKEY
*
)
buf
)
<
ts
)
{
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
memcpy
(
pCtx
->
pOutput
,
data
,
pCtx
->
inputBytes
);
*
(
TSKEY
*
)
buf
=
ts
;
*
(
TSKEY
*
)
buf
=
ts
;
DO_UPDATE_TAG_COLUMNS
(
pCtx
,
ts
);
}
...
...
@@ -1740,7 +1745,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
static
void
last_dist_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
char
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
SFirstLastInfo
*
pInput
=
(
SFirstLastInfo
*
)
(
pData
+
pCtx
->
outputBytes
);
SFirstLastInfo
*
pInput
=
(
SFirstLastInfo
*
)
(
pData
+
pCtx
->
outputBytes
);
if
(
pInput
->
hasResult
!=
DATA_SET_FLAG
)
{
return
;
}
...
...
@@ -1814,7 +1819,7 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
memcpy
(
dst
->
pTags
,
pTags
,
(
size_t
)
pTagInfo
->
tagsLen
);
}
else
{
// the tags are dumped from the ctx tag fields
for
(
int32_t
i
=
0
;
i
<
pTagInfo
->
numOfTagCols
;
++
i
)
{
SQLFunctionCtx
*
ctx
=
pTagInfo
->
pTagCtxList
[
i
];
SQLFunctionCtx
*
ctx
=
pTagInfo
->
pTagCtxList
[
i
];
if
(
ctx
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
ctx
->
tag
.
nType
=
TSDB_DATA_TYPE_BIGINT
;
ctx
->
tag
.
i64
=
tsKey
;
...
...
@@ -1833,10 +1838,11 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
memcpy((dst)->pTags, (src)->pTags, (size_t)(__l)); \
} while (0)
static
int32_t
topBotComparFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
uint16_t
type
=
*
(
uint16_t
*
)
param
;
tValuePair
*
val1
=
*
(
tValuePair
**
)
p1
;
tValuePair
*
val2
=
*
(
tValuePair
**
)
p2
;
static
int32_t
topBotComparFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
uint16_t
type
=
*
(
uint16_t
*
)
param
;
tValuePair
*
val1
=
*
(
tValuePair
**
)
p1
;
tValuePair
*
val2
=
*
(
tValuePair
**
)
p2
;
if
(
IS_SIGNED_NUMERIC_TYPE
(
type
))
{
if
(
val1
->
v
.
i64
==
val2
->
v
.
i64
)
{
...
...
@@ -1859,12 +1865,13 @@ static int32_t topBotComparFn(const void *p1, const void *p2, const void *param)
return
(
val1
->
v
.
dKey
>
val2
->
v
.
dKey
)
?
1
:
-
1
;
}
static
void
topBotSwapFn
(
void
*
dst
,
void
*
src
,
const
void
*
param
)
{
static
void
topBotSwapFn
(
void
*
dst
,
void
*
src
,
const
void
*
param
)
{
char
tag
[
32768
];
tValuePair
temp
;
uint16_t
tagLen
=
*
(
uint16_t
*
)
param
;
tValuePair
*
vdst
=
*
(
tValuePair
**
)
dst
;
tValuePair
*
vsrc
=
*
(
tValuePair
**
)
src
;
uint16_t
tagLen
=
*
(
uint16_t
*
)
param
;
tValuePair
*
vdst
=
*
(
tValuePair
**
)
dst
;
tValuePair
*
vsrc
=
*
(
tValuePair
**
)
src
;
memset
(
tag
,
0
,
sizeof
(
tag
));
temp
.
pTags
=
tag
;
...
...
@@ -1885,8 +1892,7 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData,
if
(
pInfo
->
num
<
maxLen
)
{
valuePairAssign
(
pList
[
pInfo
->
num
],
type
,
(
const
char
*
)
&
val
.
i64
,
ts
,
pTags
,
pTagInfo
,
stage
);
taosheapsort
((
void
*
)
pList
,
sizeof
(
tValuePair
**
),
pInfo
->
num
+
1
,
(
const
void
*
)
&
type
,
topBotComparFn
,
(
const
void
*
)
&
pTagInfo
->
tagsLen
,
topBotSwapFn
,
0
);
taosheapsort
((
void
*
)
pList
,
sizeof
(
tValuePair
**
),
pInfo
->
num
+
1
,
(
const
void
*
)
&
type
,
topBotComparFn
,
(
const
void
*
)
&
pTagInfo
->
tagsLen
,
topBotSwapFn
,
0
);
pInfo
->
num
++
;
}
else
{
...
...
@@ -1894,8 +1900,7 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData,
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
)
&&
val
.
u64
>
pList
[
0
]
->
v
.
u64
)
||
(
IS_FLOAT_TYPE
(
type
)
&&
val
.
dKey
>
pList
[
0
]
->
v
.
dKey
))
{
valuePairAssign
(
pList
[
0
],
type
,
(
const
char
*
)
&
val
.
i64
,
ts
,
pTags
,
pTagInfo
,
stage
);
taosheapadjust
((
void
*
)
pList
,
sizeof
(
tValuePair
**
),
0
,
maxLen
-
1
,
(
const
void
*
)
&
type
,
topBotComparFn
,
(
const
void
*
)
&
pTagInfo
->
tagsLen
,
topBotSwapFn
,
0
);
taosheapadjust
((
void
*
)
pList
,
sizeof
(
tValuePair
**
),
0
,
maxLen
-
1
,
(
const
void
*
)
&
type
,
topBotComparFn
,
(
const
void
*
)
&
pTagInfo
->
tagsLen
,
topBotSwapFn
,
0
);
}
}
}
...
...
@@ -1911,8 +1916,7 @@ static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pDa
if
(
pInfo
->
num
<
maxLen
)
{
valuePairAssign
(
pList
[
pInfo
->
num
],
type
,
(
const
char
*
)
&
val
.
i64
,
ts
,
pTags
,
pTagInfo
,
stage
);
taosheapsort
((
void
*
)
pList
,
sizeof
(
tValuePair
**
),
pInfo
->
num
+
1
,
(
const
void
*
)
&
type
,
topBotComparFn
,
(
const
void
*
)
&
pTagInfo
->
tagsLen
,
topBotSwapFn
,
1
);
taosheapsort
((
void
*
)
pList
,
sizeof
(
tValuePair
**
),
pInfo
->
num
+
1
,
(
const
void
*
)
&
type
,
topBotComparFn
,
(
const
void
*
)
&
pTagInfo
->
tagsLen
,
topBotSwapFn
,
1
);
pInfo
->
num
++
;
}
else
{
...
...
@@ -1920,8 +1924,7 @@ static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pDa
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
)
&&
val
.
u64
<
pList
[
0
]
->
v
.
u64
)
||
(
IS_FLOAT_TYPE
(
type
)
&&
val
.
dKey
<
pList
[
0
]
->
v
.
dKey
))
{
valuePairAssign
(
pList
[
0
],
type
,
(
const
char
*
)
&
val
.
i64
,
ts
,
pTags
,
pTagInfo
,
stage
);
taosheapadjust
((
void
*
)
pList
,
sizeof
(
tValuePair
**
),
0
,
maxLen
-
1
,
(
const
void
*
)
&
type
,
topBotComparFn
,
(
const
void
*
)
&
pTagInfo
->
tagsLen
,
topBotSwapFn
,
1
);
taosheapadjust
((
void
*
)
pList
,
sizeof
(
tValuePair
**
),
0
,
maxLen
-
1
,
(
const
void
*
)
&
type
,
topBotComparFn
,
(
const
void
*
)
&
pTagInfo
->
tagsLen
,
topBotSwapFn
,
1
);
}
}
}
...
...
@@ -1949,7 +1952,7 @@ static int32_t resDataAscComparFn(const void *pLeft, const void *pRight) {
}
else
{
return
pLeftElem
->
v
.
dKey
>
pRightElem
->
v
.
dKey
?
1
:
-
1
;
}
}
else
if
(
IS_SIGNED_NUMERIC_TYPE
(
pLeftElem
->
v
.
nType
))
{
}
else
if
(
IS_SIGNED_NUMERIC_TYPE
(
pLeftElem
->
v
.
nType
)){
if
(
pLeftElem
->
v
.
i64
==
pRightElem
->
v
.
i64
)
{
return
0
;
}
else
{
...
...
@@ -1968,7 +1971,7 @@ static int32_t resDataDescComparFn(const void *pLeft, const void *pRight) { retu
static
void
copyTopBotRes
(
SQLFunctionCtx
*
pCtx
,
int32_t
type
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STopBotInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
STopBotInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
tValuePair
**
tvp
=
pRes
->
res
;
...
...
@@ -2066,13 +2069,13 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
// only the first_stage_merge is directly written data into final output buffer
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
return
(
STopBotInfo
*
)
pCtx
->
pOutput
;
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate
// buffer
return
(
STopBotInfo
*
)
pCtx
->
pOutput
;
}
else
{
// during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return
GET_ROWCELL_INTERBUF
(
pResInfo
);
}
}
/*
* keep the intermediate results during scan data blocks in the format of:
* +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+
...
...
@@ -2081,13 +2084,13 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
*/
static
void
buildTopBotStruct
(
STopBotInfo
*
pTopBotInfo
,
SQLFunctionCtx
*
pCtx
)
{
char
*
tmp
=
(
char
*
)
pTopBotInfo
+
sizeof
(
STopBotInfo
);
pTopBotInfo
->
res
=
(
tValuePair
**
)
tmp
;
pTopBotInfo
->
res
=
(
tValuePair
**
)
tmp
;
tmp
+=
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
;
size_t
size
=
sizeof
(
tValuePair
)
+
pCtx
->
tagInfo
.
tagsLen
;
for
(
int32_t
i
=
0
;
i
<
pCtx
->
param
[
0
].
i64
;
++
i
)
{
pTopBotInfo
->
res
[
i
]
=
(
tValuePair
*
)
tmp
;
pTopBotInfo
->
res
[
i
]
=
(
tValuePair
*
)
tmp
;
pTopBotInfo
->
res
[
i
]
->
pTags
=
tmp
+
sizeof
(
tValuePair
);
tmp
+=
size
;
}
...
...
@@ -2106,12 +2109,11 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
return
true
;
}
if
((
void
*
)
pTopBotInfo
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pTopBotInfo
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
))
{
if
((
void
*
)
pTopBotInfo
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pTopBotInfo
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
))
{
buildTopBotStruct
(
pTopBotInfo
,
pCtx
);
}
tValuePair
**
pRes
=
(
tValuePair
**
)
pTopBotInfo
->
res
;
tValuePair
**
pRes
=
(
tValuePair
**
)
pTopBotInfo
->
res
;
if
(
pCtx
->
functionId
==
TSDB_FUNC_TOP
)
{
switch
(
pCtx
->
inputType
)
{
...
...
@@ -2150,7 +2152,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
}
}
static
bool
top_bottom_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
top_bottom_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
...
...
@@ -2179,7 +2181,7 @@ static void top_function(SQLFunctionCtx *pCtx) {
notNullElems
++
;
// NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
do_top_function_add
(
pRes
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
data
,
ts
,
pCtx
->
inputType
,
&
pCtx
->
tagInfo
,
NULL
,
0
);
}
...
...
@@ -2206,9 +2208,9 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
int16_t
type
=
(
pCtx
->
outputType
==
TSDB_DATA_TYPE_FLOAT
)
?
TSDB_DATA_TYPE_DOUBLE
:
pCtx
->
outputType
;
do_top_function_add
(
pOutput
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
&
pInput
->
res
[
i
]
->
v
.
i64
,
pInput
->
res
[
i
]
->
timestamp
,
type
,
&
pCtx
->
tagInfo
,
pInput
->
res
[
i
]
->
pTags
,
pCtx
->
currentStage
);
int16_t
type
=
(
pCtx
->
outputType
==
TSDB_DATA_TYPE_FLOAT
)
?
TSDB_DATA_TYPE_DOUBLE
:
pCtx
->
outputType
;
do_top_function_add
(
pOutput
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
&
pInput
->
res
[
i
]
->
v
.
i64
,
pInput
->
res
[
i
]
->
timestamp
,
type
,
&
pCtx
->
tagInfo
,
pInput
->
res
[
i
]
->
pTags
,
pCtx
->
currentStage
);
}
SET_VAL
(
pCtx
,
pInput
->
num
,
pOutput
->
num
);
...
...
@@ -2236,7 +2238,7 @@ static void bottom_function(SQLFunctionCtx *pCtx) {
notNullElems
++
;
// NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
do_bottom_function_add
(
pRes
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
data
,
ts
,
pCtx
->
inputType
,
&
pCtx
->
tagInfo
,
NULL
,
0
);
}
...
...
@@ -2264,8 +2266,8 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
int16_t
type
=
(
pCtx
->
outputType
==
TSDB_DATA_TYPE_FLOAT
)
?
TSDB_DATA_TYPE_DOUBLE
:
pCtx
->
outputType
;
do_bottom_function_add
(
pOutput
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
&
pInput
->
res
[
i
]
->
v
.
i64
,
pInput
->
res
[
i
]
->
timestamp
,
type
,
&
pCtx
->
tagInfo
,
pInput
->
res
[
i
]
->
pTags
,
pCtx
->
currentStage
);
do_bottom_function_add
(
pOutput
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
&
pInput
->
res
[
i
]
->
v
.
i64
,
pInput
->
res
[
i
]
->
timestamp
,
type
,
&
pCtx
->
tagInfo
,
pInput
->
res
[
i
]
->
pTags
,
pCtx
->
currentStage
);
}
SET_VAL
(
pCtx
,
pInput
->
num
,
pOutput
->
num
);
...
...
@@ -2305,7 +2307,7 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
}
///////////////////////////////////////////////////////////////////////////////////////////////
static
bool
percentile_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
static
bool
percentile_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
...
...
@@ -2315,15 +2317,15 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *
SET_DOUBLE_VAL
(
&
pInfo
->
minval
,
DBL_MAX
);
SET_DOUBLE_VAL
(
&
pInfo
->
maxval
,
-
DBL_MAX
);
pInfo
->
numOfElems
=
0
;
return
true
;
}
static
void
percentile_function
(
SQLFunctionCtx
*
pCtx
)
{
int32_t
notNullElems
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SPercentileInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pCtx
->
currentStage
==
REPEAT_SCAN
&&
pInfo
->
stage
==
0
)
{
pInfo
->
stage
+=
1
;
...
...
@@ -2385,6 +2387,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
pInfo
->
numOfElems
+=
1
;
}
}
return
;
}
...
...
@@ -2405,35 +2408,34 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
static
void
percentile_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
double
v
=
pCtx
->
param
[
0
].
nType
==
TSDB_DATA_TYPE_INT
?
pCtx
->
param
[
0
].
i64
:
pCtx
->
param
[
0
].
dKey
;
double
result
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
ppInfo
=
(
SPercentileInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
SPercentileInfo
*
ppInfo
=
(
SPercentileInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
tMemBucket
*
pMemBucket
=
ppInfo
->
pMemBucket
;
tMemBucket
*
pMemBucket
=
ppInfo
->
pMemBucket
;
if
(
pMemBucket
==
NULL
||
pMemBucket
->
total
==
0
)
{
// check for null
assert
(
ppInfo
->
numOfElems
==
0
);
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
}
else
{
result
=
getPercentile
(
pMemBucket
,
v
);
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
result
);
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
getPercentile
(
pMemBucket
,
v
));
}
tMemBucketDestroy
(
pMemBucket
);
doFinalizer
(
pCtx
);
}
static
void
buildHistogramInfo
(
SAPercentileInfo
*
pInfo
)
{
pInfo
->
pHisto
=
(
SHistogramInfo
*
)((
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
));
pInfo
->
pHisto
->
elems
=
(
SHistBin
*
)((
char
*
)
pInfo
->
pHisto
+
sizeof
(
SHistogramInfo
));
//////////////////////////////////////////////////////////////////////////////////
static
void
buildHistogramInfo
(
SAPercentileInfo
*
pInfo
)
{
pInfo
->
pHisto
=
(
SHistogramInfo
*
)
((
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
));
pInfo
->
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pInfo
->
pHisto
+
sizeof
(
SHistogramInfo
));
}
static
SAPercentileInfo
*
getAPerctInfo
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pInfo
=
NULL
;
SAPercentileInfo
*
pInfo
=
NULL
;
if
(
pCtx
->
stableQuery
&&
pCtx
->
currentStage
!=
MERGE_STAGE
)
{
pInfo
=
(
SAPercentileInfo
*
)
pCtx
->
pOutput
;
pInfo
=
(
SAPercentileInfo
*
)
pCtx
->
pOutput
;
}
else
{
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
}
...
...
@@ -2558,7 +2560,7 @@ int32_t getAlgo(SQLFunctionCtx * pCtx) {
return
(
int32_t
)
pCtx
->
param
[
1
].
i64
;
}
static
bool
apercentile_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
static
bool
apercentile_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
if
(
getAlgo
(
pCtx
)
==
ALGO_TDIGEST
)
{
return
tdigest_setup
(
pCtx
,
pResultInfo
);
}
...
...
@@ -2582,8 +2584,8 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
int32_t
notNullElems
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pInfo
=
getAPerctInfo
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pInfo
=
getAPerctInfo
(
pCtx
);
assert
(
pInfo
->
pHisto
->
elems
!=
NULL
);
...
...
@@ -2619,25 +2621,25 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
SAPercentileInfo
*
pInput
=
(
SAPercentileInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
pInput
->
pHisto
=
(
SHistogramInfo
*
)
((
char
*
)
pInput
+
sizeof
(
SAPercentileInfo
));
pInput
->
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pInput
->
pHisto
+
sizeof
(
SHistogramInfo
));
pInput
->
pHisto
=
(
SHistogramInfo
*
)
((
char
*
)
pInput
+
sizeof
(
SAPercentileInfo
));
pInput
->
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pInput
->
pHisto
+
sizeof
(
SHistogramInfo
));
if
(
pInput
->
pHisto
->
numOfElems
<=
0
)
{
return
;
}
SAPercentileInfo
*
pOutput
=
getAPerctInfo
(
pCtx
);
SHistogramInfo
*
pHisto
=
pOutput
->
pHisto
;
SHistogramInfo
*
pHisto
=
pOutput
->
pHisto
;
if
(
pHisto
->
numOfElems
<=
0
)
{
memcpy
(
pHisto
,
pInput
->
pHisto
,
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
(
MAX_HISTOGRAM_BIN
+
1
));
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
}
else
{
//
TODO(dengyihao): avoid memcpy
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
//
TODO(dengyihao): avoid memcpy
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
SHistogramInfo
*
pRes
=
tHistogramMerge
(
pHisto
,
pInput
->
pHisto
,
MAX_HISTOGRAM_BIN
);
memcpy
(
pHisto
,
pRes
,
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
MAX_HISTOGRAM_BIN
);
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
pHisto
->
elems
=
(
SHistBin
*
)
((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
tHistogramDestroy
(
&
pRes
);
}
...
...
@@ -2654,8 +2656,8 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
double
v
=
(
pCtx
->
param
[
0
].
nType
==
TSDB_DATA_TYPE_INT
)
?
pCtx
->
param
[
0
].
i64
:
pCtx
->
param
[
0
].
dKey
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pOutput
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pOutput
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
if
(
pResInfo
->
hasResult
==
DATA_SET_FLAG
)
{
// check for null
...
...
@@ -2663,6 +2665,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
double
ratio
[]
=
{
v
};
double
*
res
=
tHistogramUniform
(
pOutput
->
pHisto
,
ratio
,
1
);
memcpy
(
pCtx
->
pOutput
,
res
,
sizeof
(
double
));
free
(
res
);
}
else
{
...
...
@@ -2686,7 +2689,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
}
/////////////////////////////////////////////////////////////////////////////////
static
bool
leastsquares_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
leastsquares_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
...
...
@@ -2717,8 +2720,8 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo
}
static
void
leastsquares_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquaresInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquaresInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
double
(
*
param
)[
3
]
=
pInfo
->
mat
;
double
x
=
pInfo
->
startVal
;
...
...
@@ -2731,7 +2734,7 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) {
int32_t
*
p
=
pData
;
// LEASTSQR_CAL_LOOP(pCtx, param, pParamData, p);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
p
,
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
p
,
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -2804,8 +2807,8 @@ static void leastsquares_function(SQLFunctionCtx *pCtx) {
static
void
leastsquares_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
// no data in query
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquaresInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SLeastsquaresInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pInfo
->
num
==
0
)
{
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
...
...
@@ -2827,8 +2830,8 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
param
[
1
][
2
]
/=
param
[
1
][
1
];
int32_t
maxOutputSize
=
TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE
-
VARSTR_HEADER_SIZE
;
size_t
n
=
snprintf
(
varDataVal
(
pCtx
->
pOutput
),
maxOutputSize
,
"{slop:%.6lf, intercept:%.6lf}"
,
param
[
0
][
2
],
param
[
1
][
2
]);
size_t
n
=
snprintf
(
varDataVal
(
pCtx
->
pOutput
),
maxOutputSize
,
"{slop:%.6lf, intercept:%.6lf}"
,
param
[
0
][
2
],
param
[
1
][
2
]);
varDataSetLen
(
pCtx
->
pOutput
,
n
);
doFinalizer
(
pCtx
);
...
...
@@ -2854,11 +2857,12 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
char
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
int32_t
numOfRows
=
(
pCtx
->
param
[
0
].
i64
==
1
)
?
1
:
pCtx
->
size
;
memcpy
(
pCtx
->
pOutput
,
pData
,
(
size_t
)
numOfRows
*
pCtx
->
inputBytes
);
int32_t
numOfRows
=
(
pCtx
->
param
[
0
].
i64
==
1
)
?
1
:
pCtx
->
size
;
memcpy
(
pCtx
->
pOutput
,
pData
,
(
size_t
)
numOfRows
*
pCtx
->
inputBytes
);
}
else
{
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
memcpy
(
pCtx
->
pOutput
+
(
pCtx
->
size
-
1
-
i
)
*
pCtx
->
inputBytes
,
pData
+
i
*
pCtx
->
inputBytes
,
pCtx
->
inputBytes
);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
memcpy
(
pCtx
->
pOutput
+
(
pCtx
->
size
-
1
-
i
)
*
pCtx
->
inputBytes
,
pData
+
i
*
pCtx
->
inputBytes
,
pCtx
->
inputBytes
);
}
}
}
...
...
@@ -2874,7 +2878,7 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
assert
(
pCtx
->
inputBytes
==
pCtx
->
outputBytes
);
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
pOutput
,
pCtx
->
outputType
,
true
);
char
*
data
=
pCtx
->
pOutput
;
char
*
data
=
pCtx
->
pOutput
;
pCtx
->
pOutput
+=
pCtx
->
outputBytes
;
// directly copy from the first one
...
...
@@ -2913,7 +2917,7 @@ enum {
INITIAL_VALUE_NOT_ASSIGNED
=
0
,
};
static
bool
diff_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
diff_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
...
...
@@ -2923,13 +2927,13 @@ static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResIn
return
false
;
}
static
bool
deriv_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
static
bool
deriv_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResultInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
// diff function require the value is set to -1
SDerivInfo
*
pDerivInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
SDerivInfo
*
pDerivInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
pDerivInfo
->
ignoreNegative
=
pCtx
->
param
[
1
].
i64
;
pDerivInfo
->
prevTs
=
-
1
;
...
...
@@ -2940,7 +2944,7 @@ static bool deriv_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResu
static
void
deriv_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SDerivInfo
*
pDerivInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SDerivInfo
*
pDerivInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
void
*
data
=
GET_INPUT_DATA_LIST
(
pCtx
);
...
...
@@ -2964,8 +2968,7 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
if
(
!
pDerivInfo
->
valueSet
)
{
// initial value is not set yet
pDerivInfo
->
valueSet
=
true
;
}
else
{
SET_DOUBLE_VAL
(
pOutput
,
((
pData
[
i
]
-
pDerivInfo
->
prevValue
)
*
pDerivInfo
->
tsWindow
)
/
(
tsList
[
i
]
-
pDerivInfo
->
prevTs
));
SET_DOUBLE_VAL
(
pOutput
,
((
pData
[
i
]
-
pDerivInfo
->
prevValue
)
*
pDerivInfo
->
tsWindow
)
/
(
tsList
[
i
]
-
pDerivInfo
->
prevTs
));
if
(
pDerivInfo
->
ignoreNegative
&&
*
pOutput
<
0
)
{
}
else
{
*
pTimestamp
=
tsList
[
i
];
...
...
@@ -3002,7 +3005,7 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
}
}
pDerivInfo
->
prevValue
=
(
double
)
pData
[
i
];
pDerivInfo
->
prevValue
=
(
double
)
pData
[
i
];
pDerivInfo
->
prevTs
=
tsList
[
i
];
}
break
;
...
...
@@ -3142,8 +3145,8 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pCtx
->
order
);
int32_t
i
=
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
?
0
:
pCtx
->
size
-
1
;
TSKEY
*
pTimestamp
=
pCtx
->
ptsOutputBuf
;
TSKEY
*
tsList
=
GET_TS_LIST
(
pCtx
);
TSKEY
*
pTimestamp
=
pCtx
->
ptsOutputBuf
;
TSKEY
*
tsList
=
GET_TS_LIST
(
pCtx
);
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_INT
:
{
...
...
@@ -3151,13 +3154,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int32_t
*
pOutput
=
(
int32_t
*
)
pCtx
->
pOutput
;
for
(;
i
<
pCtx
->
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
int32_t
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
i64
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
...
...
@@ -3173,13 +3176,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int64_t
*
pOutput
=
(
int64_t
*
)
pCtx
->
pOutput
;
for
(;
i
<
pCtx
->
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
pData
[
i
]
-
pCtx
->
param
[
1
].
i64
;
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
...
...
@@ -3195,13 +3198,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
double
*
pOutput
=
(
double
*
)
pCtx
->
pOutput
;
for
(;
i
<
pCtx
->
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
SET_DOUBLE_VAL
(
pOutput
,
pData
[
i
]
-
pCtx
->
param
[
1
].
dKey
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
...
...
@@ -3217,13 +3220,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
float
*
pOutput
=
(
float
*
)
pCtx
->
pOutput
;
for
(;
i
<
pCtx
->
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
float
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
dKey
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
...
...
@@ -3239,13 +3242,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int16_t
*
pOutput
=
(
int16_t
*
)
pCtx
->
pOutput
;
for
(;
i
<
pCtx
->
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
pData
[
i
],
pCtx
->
inputType
))
{
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
int16_t
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
i64
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
...
...
@@ -3268,7 +3271,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
int8_t
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
i64
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
...
...
@@ -3297,7 +3300,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
}
}
char
*
getArithColumnData
(
void
*
param
,
const
char
*
name
,
int32_t
colId
)
{
char
*
getArithColumnData
(
void
*
param
,
const
char
*
name
,
int32_t
colId
)
{
SArithmeticSupport
*
pSupport
=
(
SArithmeticSupport
*
)
param
;
int32_t
index
=
-
1
;
...
...
@@ -3337,7 +3340,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
}
/////////////////////////////////////////////////////////////////////////////////
static
bool
spread_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
spread_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
...
...
@@ -3358,7 +3361,7 @@ static bool spread_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pRes
static
void
spread_function
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SSpreadInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SSpreadInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
int32_t
numOfElems
=
0
;
...
...
@@ -3423,7 +3426,7 @@ static void spread_function(SQLFunctionCtx *pCtx) {
assert
(
pCtx
->
size
==
numOfElems
);
}
_spread_over:
_spread_over:
SET_VAL
(
pCtx
,
numOfElems
,
1
);
if
(
numOfElems
>
0
)
{
...
...
@@ -3490,12 +3493,13 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer
(
pCtx
);
}
/**
* param[1]: start time
* param[2]: end time
* @param pCtx
*/
static
bool
twa_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
twa_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
...
...
@@ -3507,25 +3511,25 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResInf
}
static
double
twa_get_area
(
SPoint1
s
,
SPoint1
e
)
{
if
((
s
.
val
>=
0
&&
e
.
val
>=
0
)
||
(
s
.
val
<=
0
&&
e
.
val
<=
0
))
{
if
((
s
.
val
>=
0
&&
e
.
val
>=
0
)
||
(
s
.
val
<=
0
&&
e
.
val
<=
0
))
{
return
(
s
.
val
+
e
.
val
)
*
(
e
.
key
-
s
.
key
)
/
2
;
}
double
x
=
(
s
.
key
*
e
.
val
-
e
.
key
*
s
.
val
)
/
(
e
.
val
-
s
.
val
);
double
x
=
(
s
.
key
*
e
.
val
-
e
.
key
*
s
.
val
)
/
(
e
.
val
-
s
.
val
);
double
val
=
(
s
.
val
*
(
x
-
s
.
key
)
+
e
.
val
*
(
e
.
key
-
x
))
/
2
;
return
val
;
}
static
int32_t
twa_function_impl
(
SQLFunctionCtx
*
pCtx
,
int32_t
index
,
int32_t
size
)
{
static
int32_t
twa_function_impl
(
SQLFunctionCtx
*
pCtx
,
int32_t
index
,
int32_t
size
)
{
int32_t
notNullElems
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STwaInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
TSKEY
*
tsList
=
GET_TS_LIST
(
pCtx
);
TSKEY
*
tsList
=
GET_TS_LIST
(
pCtx
);
int32_t
i
=
index
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pCtx
->
order
);
SPoint1
*
last
=
&
pInfo
->
p
;
SPoint1
*
last
=
&
pInfo
->
p
;
if
(
pCtx
->
start
.
key
!=
INT64_MIN
)
{
assert
((
pCtx
->
start
.
key
<
tsList
[
i
]
&&
pCtx
->
order
==
TSDB_ORDER_ASC
)
||
...
...
@@ -3553,11 +3557,11 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
}
// calculate the value of
switch
(
pCtx
->
inputType
)
{
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
{
int8_t
*
val
=
(
int8_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
int8_t
*
val
=
(
int8_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -3574,9 +3578,9 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
int16_t
*
val
=
(
int16_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
int16_t
*
val
=
(
int16_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -3593,9 +3597,9 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
break
;
}
case
TSDB_DATA_TYPE_INT
:
{
int32_t
*
val
=
(
int32_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
int32_t
*
val
=
(
int32_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -3612,14 +3616,14 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
break
;
}
case
TSDB_DATA_TYPE_BIGINT
:
{
int64_t
*
val
=
(
int64_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
int64_t
*
val
=
(
int64_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
#ifndef _TD_NINGSI_60
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
(
double
)
val
[
i
]};
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
(
double
)
val
[
i
]};
#else
SPoint1
st
;
st
.
key
=
tsList
[
i
];
...
...
@@ -3631,9 +3635,9 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
float
*
val
=
(
float
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
float
*
val
=
(
float
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -3650,9 +3654,9 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
double
*
val
=
(
double
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
double
*
val
=
(
double
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -3669,9 +3673,9 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
break
;
}
case
TSDB_DATA_TYPE_UTINYINT
:
{
uint8_t
*
val
=
(
uint8_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
uint8_t
*
val
=
(
uint8_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -3688,9 +3692,9 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
break
;
}
case
TSDB_DATA_TYPE_USMALLINT
:
{
uint16_t
*
val
=
(
uint16_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
uint16_t
*
val
=
(
uint16_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -3707,9 +3711,9 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
break
;
}
case
TSDB_DATA_TYPE_UINT
:
{
uint32_t
*
val
=
(
uint32_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
uint32_t
*
val
=
(
uint32_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
...
...
@@ -3726,26 +3730,25 @@ static int32_t twa_function_impl(SQLFunctionCtx *pCtx, int32_t index, int32_t si
break
;
}
case
TSDB_DATA_TYPE_UBIGINT
:
{
uint64_t
*
val
=
(
uint64_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
uint64_t
*
val
=
(
uint64_t
*
)
GET_INPUT_DATA
(
pCtx
,
0
);
for
(;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
&
val
[
i
],
pCtx
->
inputType
))
{
continue
;
}
#ifndef _TD_NINGSI_60
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
(
double
)
val
[
i
]};
SPoint1
st
=
{.
key
=
tsList
[
i
],
.
val
=
(
double
)
val
[
i
]};
#else
SPoint1
st
;
st
.
key
=
tsList
[
i
];
st
.
val
=
(
double
)
val
[
i
];
st
.
val
=
(
double
)
val
[
i
];
#endif
pInfo
->
dOutput
+=
twa_get_area
(
pInfo
->
p
,
st
);
pInfo
->
p
=
st
;
}
break
;
}
default:
assert
(
0
);
default:
assert
(
0
);
}
// the last interpolated time window value
...
...
@@ -3762,11 +3765,11 @@ static void twa_function(SQLFunctionCtx *pCtx) {
void
*
data
=
GET_INPUT_DATA_LIST
(
pCtx
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STwaInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
STwaInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
// skip null value
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pCtx
->
order
);
int32_t
i
=
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
?
0
:
(
pCtx
->
size
-
1
);
int32_t
i
=
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
?
0
:
(
pCtx
->
size
-
1
);
while
(
pCtx
->
hasNull
&&
i
<
pCtx
->
size
&&
i
>=
0
&&
isNull
((
char
*
)
data
+
pCtx
->
inputBytes
*
i
,
pCtx
->
inputType
))
{
i
+=
step
;
}
...
...
@@ -3813,7 +3816,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
if
(
pInfo
->
win
.
ekey
==
pInfo
->
win
.
skey
)
{
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
pInfo
->
p
.
val
);
}
else
{
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
pInfo
->
dOutput
/
(
pInfo
->
win
.
ekey
-
pInfo
->
win
.
skey
));
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
pInfo
->
dOutput
/
(
pInfo
->
win
.
ekey
-
pInfo
->
win
.
skey
));
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
...
...
@@ -3826,7 +3829,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
*/
static
void
interp_function_impl
(
SQLFunctionCtx
*
pCtx
)
{
int32_t
type
=
(
int32_t
)
pCtx
->
param
[
2
].
i64
;
int32_t
type
=
(
int32_t
)
pCtx
->
param
[
2
].
i64
;
if
(
type
==
TSDB_FILL_NONE
)
{
return
;
}
...
...
@@ -3840,9 +3843,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
}
else
if
(
type
==
TSDB_FILL_SET_VALUE
)
{
tVariantDump
(
&
pCtx
->
param
[
1
],
pCtx
->
pOutput
,
pCtx
->
inputType
,
true
);
}
else
{
if
(
pCtx
->
start
.
key
!=
INT64_MIN
&&
((
ascQuery
&&
pCtx
->
start
.
key
<=
pCtx
->
startTs
&&
pCtx
->
end
.
key
>=
pCtx
->
startTs
)
||
((
!
ascQuery
)
&&
pCtx
->
start
.
key
>=
pCtx
->
startTs
&&
pCtx
->
end
.
key
<=
pCtx
->
startTs
)))
{
if
(
pCtx
->
start
.
key
!=
INT64_MIN
&&
((
ascQuery
&&
pCtx
->
start
.
key
<=
pCtx
->
startTs
&&
pCtx
->
end
.
key
>=
pCtx
->
startTs
)
||
((
!
ascQuery
)
&&
pCtx
->
start
.
key
>=
pCtx
->
startTs
&&
pCtx
->
end
.
key
<=
pCtx
->
startTs
)))
{
if
(
type
==
TSDB_FILL_PREV
)
{
if
(
IS_NUMERIC_TYPE
(
pCtx
->
inputType
)
||
pCtx
->
inputType
==
TSDB_DATA_TYPE_BOOL
)
{
SET_TYPED_DATA
(
pCtx
->
pOutput
,
pCtx
->
inputType
,
pCtx
->
start
.
val
);
...
...
@@ -3888,14 +3889,14 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
size
>
1
)
{
TSKEY
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
if
((
ascQuery
&&
ekey
>
skey
&&
ekey
<=
pCtx
->
startTs
)
||
((
!
ascQuery
)
&&
ekey
<
skey
&&
ekey
>=
pCtx
->
startTs
))
{
((
!
ascQuery
)
&&
ekey
<
skey
&&
ekey
>=
pCtx
->
startTs
))
{
skey
=
ekey
;
}
}
assignVal
(
pCtx
->
pOutput
,
pCtx
->
pInput
,
pCtx
->
outputBytes
,
pCtx
->
inputType
);
}
else
if
(
type
==
TSDB_FILL_NEXT
)
{
TSKEY
ekey
=
skey
;
char
*
val
=
NULL
;
char
*
val
=
NULL
;
if
((
ascQuery
&&
ekey
<
pCtx
->
startTs
)
||
((
!
ascQuery
)
&&
ekey
>
pCtx
->
startTs
))
{
if
(
pCtx
->
size
>
1
)
{
...
...
@@ -3904,12 +3905,12 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
return
;
}
val
=
((
char
*
)
pCtx
->
pInput
)
+
pCtx
->
inputBytes
;
val
=
((
char
*
)
pCtx
->
pInput
)
+
pCtx
->
inputBytes
;
}
else
{
return
;
}
}
else
{
val
=
(
char
*
)
pCtx
->
pInput
;
val
=
(
char
*
)
pCtx
->
pInput
;
}
assignVal
(
pCtx
->
pOutput
,
val
,
pCtx
->
outputBytes
,
pCtx
->
inputType
);
...
...
@@ -3921,8 +3922,8 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
TSKEY
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
// no data generated yet
if
((
ascQuery
&&
!
(
skey
<=
pCtx
->
startTs
&&
ekey
>=
pCtx
->
startTs
))
||
((
!
ascQuery
)
&&
!
(
skey
>=
pCtx
->
startTs
&&
ekey
<=
pCtx
->
startTs
)))
{
if
((
ascQuery
&&
!
(
skey
<=
pCtx
->
startTs
&&
ekey
>=
pCtx
->
startTs
))
||
((
!
ascQuery
)
&&
!
(
skey
>=
pCtx
->
startTs
&&
ekey
<=
pCtx
->
startTs
)))
{
return
;
}
...
...
@@ -3955,7 +3956,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
if
(
pCtx
->
size
>
0
)
{
bool
ascQuery
=
(
pCtx
->
order
==
TSDB_ORDER_ASC
);
TSKEY
key
;
char
*
pData
;
char
*
pData
;
int32_t
typedData
=
0
;
if
(
ascQuery
)
{
...
...
@@ -3976,8 +3977,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
}
}
// if (key == pCtx->startTs && (ascQuery || !(IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType ==
// TSDB_DATA_TYPE_BOOL))) {
//if (key == pCtx->startTs && (ascQuery || !(IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL))) {
if
(
key
==
pCtx
->
startTs
)
{
if
(
typedData
)
{
SET_TYPED_DATA
(
pCtx
->
pOutput
,
pCtx
->
inputType
,
*
(
double
*
)
pData
);
...
...
@@ -3989,12 +3989,12 @@ static void interp_function(SQLFunctionCtx *pCtx) {
}
else
{
interp_function_impl
(
pCtx
);
}
}
else
{
//
no qualified data rows and interpolation is required
}
else
{
//no qualified data rows and interpolation is required
interp_function_impl
(
pCtx
);
}
}
static
bool
ts_comp_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
ts_comp_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
// not initialized since it has been initialized
}
...
...
@@ -4032,7 +4032,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
STSBuf
*
pTSbuf
=
pInfo
->
pTSBuf
;
tsBufFlush
(
pTSbuf
);
qDebug
(
"total timestamp :%"
PRId64
,
pTSbuf
->
numOfTotal
);
qDebug
(
"total timestamp :%"
PRId64
,
pTSbuf
->
numOfTotal
);
// TODO refactor transfer ownership of current file
*
(
FILE
**
)
pCtx
->
pOutput
=
pTSbuf
->
f
;
...
...
@@ -4053,7 +4053,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
//////////////////////////////////////////////////////////////////////////////////////////////
// rate functions
static
double
do_calc_rate
(
const
SRateInfo
*
pRateInfo
,
double
tickPerSec
)
{
static
double
do_calc_rate
(
const
SRateInfo
*
pRateInfo
,
double
tickPerSec
)
{
if
((
INT64_MIN
==
pRateInfo
->
lastKey
)
||
(
INT64_MIN
==
pRateInfo
->
firstKey
)
||
(
pRateInfo
->
firstKey
>=
pRateInfo
->
lastKey
))
{
return
0
.
0
;
...
...
@@ -4079,10 +4079,10 @@ static double do_calc_rate(const SRateInfo *pRateInfo, double tickPerSec) {
return
0
;
}
return
(
duration
>
0
)
?
((
double
)
diff
)
/
(
duration
/
tickPerSec
)
:
0
.
0
;
return
(
duration
>
0
)
?
((
double
)
diff
)
/
(
duration
/
tickPerSec
)
:
0
.
0
;
}
static
bool
rate_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
static
bool
rate_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
...
...
@@ -4091,8 +4091,8 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResIn
pInfo
->
correctionValue
=
0
;
pInfo
->
firstKey
=
INT64_MIN
;
pInfo
->
lastKey
=
INT64_MIN
;
pInfo
->
firstValue
=
(
double
)
INT64_MIN
;
pInfo
->
lastValue
=
(
double
)
INT64_MIN
;
pInfo
->
firstValue
=
(
double
)
INT64_MIN
;
pInfo
->
lastValue
=
(
double
)
INT64_MIN
;
pInfo
->
hasResult
=
0
;
pInfo
->
isIRate
=
(
pCtx
->
functionId
==
TSDB_FUNC_IRATE
);
...
...
@@ -4104,7 +4104,7 @@ static void rate_function(SQLFunctionCtx *pCtx) {
int32_t
notNullElems
=
0
;
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
TSKEY
*
primaryKey
=
GET_TS_LIST
(
pCtx
);
TSKEY
*
primaryKey
=
GET_TS_LIST
(
pCtx
);
qDebug
(
"%p rate_function() size:%d, hasNull:%d"
,
pCtx
,
pCtx
->
size
,
pCtx
->
hasNull
);
...
...
@@ -4157,19 +4157,19 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) {
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
memcpy
(
GET_ROWCELL_INTERBUF
(
pResInfo
),
pCtx
->
pInput
,
(
size_t
)
pCtx
->
inputBytes
);
pResInfo
->
hasResult
=
((
SRateInfo
*
)
pCtx
->
pInput
)
->
hasResult
;
pResInfo
->
hasResult
=
((
SRateInfo
*
)
pCtx
->
pInput
)
->
hasResult
;
}
static
void
rate_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pRateInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
setNull
(
pCtx
->
pOutput
,
TSDB_DATA_TYPE_DOUBLE
,
sizeof
(
double
));
return
;
}
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
do_calc_rate
(
pRateInfo
,
(
double
)
TSDB_TICK_PER_SECOND
(
pCtx
->
param
[
0
].
i64
)));
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
do_calc_rate
(
pRateInfo
,
(
double
)
TSDB_TICK_PER_SECOND
(
pCtx
->
param
[
0
].
i64
)));
// cannot set the numOfIteratedElems again since it is set during previous iteration
pResInfo
->
numOfRes
=
1
;
...
...
@@ -4183,7 +4183,7 @@ static void irate_function(SQLFunctionCtx *pCtx) {
int32_t
notNullElems
=
0
;
SRateInfo
*
pRateInfo
=
(
SRateInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
TSKEY
*
primaryKey
=
GET_TS_LIST
(
pCtx
);
TSKEY
*
primaryKey
=
GET_TS_LIST
(
pCtx
);
for
(
int32_t
i
=
pCtx
->
size
-
1
;
i
>=
0
;
--
i
)
{
char
*
pData
=
GET_INPUT_DATA
(
pCtx
,
i
);
...
...
@@ -4222,12 +4222,12 @@ static void irate_function(SQLFunctionCtx *pCtx) {
}
}
void
blockInfo_func
(
SQLFunctionCtx
*
pCtx
)
{
void
blockInfo_func
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
int32_t
len
=
*
(
int32_t
*
)
pCtx
->
pInput
;
blockDistInfoFromBinary
((
char
*
)
pCtx
->
pInput
+
sizeof
(
int32_t
),
len
,
pDist
);
int32_t
len
=
*
(
int32_t
*
)
pCtx
->
pInput
;
blockDistInfoFromBinary
((
char
*
)
pCtx
->
pInput
+
sizeof
(
int32_t
),
len
,
pDist
);
pDist
->
rowSize
=
(
uint16_t
)
pCtx
->
param
[
0
].
i64
;
memcpy
(
pCtx
->
pOutput
,
pCtx
->
pInput
,
sizeof
(
int32_t
)
+
len
);
...
...
@@ -4236,8 +4236,8 @@ void blockInfo_func(SQLFunctionCtx *pCtx) {
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
static
void
mergeTableBlockDist
(
SResultRowCellInfo
*
pResInfo
,
const
STableBlockDist
*
pSrc
)
{
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
static
void
mergeTableBlockDist
(
SResultRowCellInfo
*
pResInfo
,
const
STableBlockDist
*
pSrc
)
{
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
assert
(
pDist
!=
NULL
&&
pSrc
!=
NULL
);
pDist
->
numOfTables
+=
pSrc
->
numOfTables
;
...
...
@@ -4254,7 +4254,7 @@ static void mergeTableBlockDist(SResultRowCellInfo *pResInfo, const STableBlockD
pDist
->
maxRows
=
pSrc
->
maxRows
;
pDist
->
minRows
=
pSrc
->
minRows
;
int32_t
maxSteps
=
TSDB_MAX_MAX_ROW_FBLOCK
/
TSDB_BLOCK_DIST_STEP_ROWS
;
int32_t
maxSteps
=
TSDB_MAX_MAX_ROW_FBLOCK
/
TSDB_BLOCK_DIST_STEP_ROWS
;
if
(
TSDB_MAX_MAX_ROW_FBLOCK
%
TSDB_BLOCK_DIST_STEP_ROWS
!=
0
)
{
++
maxSteps
;
}
...
...
@@ -4264,16 +4264,16 @@ static void mergeTableBlockDist(SResultRowCellInfo *pResInfo, const STableBlockD
size_t
steps
=
taosArrayGetSize
(
pSrc
->
dataBlockInfos
);
for
(
int32_t
i
=
0
;
i
<
steps
;
++
i
)
{
int32_t
srcNumBlocks
=
((
SFileBlockInfo
*
)
taosArrayGet
(
pSrc
->
dataBlockInfos
,
i
))
->
numBlocksOfStep
;
SFileBlockInfo
*
blockInfo
=
(
SFileBlockInfo
*
)
taosArrayGet
(
pDist
->
dataBlockInfos
,
i
);
int32_t
srcNumBlocks
=
((
SFileBlockInfo
*
)
taosArrayGet
(
pSrc
->
dataBlockInfos
,
i
))
->
numBlocksOfStep
;
SFileBlockInfo
*
blockInfo
=
(
SFileBlockInfo
*
)
taosArrayGet
(
pDist
->
dataBlockInfos
,
i
);
blockInfo
->
numBlocksOfStep
+=
srcNumBlocks
;
}
}
void
block_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
void
block_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
STableBlockDist
info
=
{
0
};
int32_t
len
=
*
(
int32_t
*
)
pCtx
->
pInput
;
blockDistInfoFromBinary
(((
char
*
)
pCtx
->
pInput
)
+
sizeof
(
int32_t
),
len
,
&
info
);
int32_t
len
=
*
(
int32_t
*
)
pCtx
->
pInput
;
blockDistInfoFromBinary
(((
char
*
)
pCtx
->
pInput
)
+
sizeof
(
int32_t
),
len
,
&
info
);
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
mergeTableBlockDist
(
pResInfo
,
&
info
);
taosArrayDestroy
(
info
.
dataBlockInfos
);
...
...
@@ -4282,8 +4282,8 @@ void block_func_merge(SQLFunctionCtx *pCtx) {
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
}
void
getPercentiles
(
STableBlockDist
*
pTableBlockDist
,
int64_t
totalBlocks
,
int32_t
numOfPercents
,
double
*
percents
,
int32_t
*
percentiles
)
{
void
getPercentiles
(
STableBlockDist
*
pTableBlockDist
,
int64_t
totalBlocks
,
int32_t
numOfPercents
,
double
*
percents
,
int32_t
*
percentiles
)
{
if
(
totalBlocks
==
0
)
{
for
(
int32_t
i
=
0
;
i
<
numOfPercents
;
++
i
)
{
percentiles
[
i
]
=
0
;
...
...
@@ -4313,16 +4313,16 @@ void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32
}
for
(
int32_t
i
=
0
;
i
<
numOfPercents
;
++
i
)
{
percentiles
[
i
]
=
(
percentiles
[
i
]
+
1
)
*
TSDB_BLOCK_DIST_STEP_ROWS
-
TSDB_BLOCK_DIST_STEP_ROWS
/
2
;
percentiles
[
i
]
=
(
percentiles
[
i
]
+
1
)
*
TSDB_BLOCK_DIST_STEP_ROWS
-
TSDB_BLOCK_DIST_STEP_ROWS
/
2
;
}
}
void
generateBlockDistResult
(
STableBlockDist
*
pTableBlockDist
,
char
*
result
)
{
void
generateBlockDistResult
(
STableBlockDist
*
pTableBlockDist
,
char
*
result
)
{
if
(
pTableBlockDist
==
NULL
)
{
return
;
}
SArray
*
blockInfos
=
pTableBlockDist
->
dataBlockInfos
;
SArray
*
blockInfos
=
pTableBlockDist
->
dataBlockInfos
;
uint64_t
totalRows
=
pTableBlockDist
->
totalRows
;
size_t
numSteps
=
taosArrayGetSize
(
blockInfos
);
int64_t
totalBlocks
=
0
;
...
...
@@ -4334,7 +4334,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char *result) {
totalBlocks
+=
blocks
;
}
avg
=
totalBlocks
>
0
?
(
int64_t
)(
totalRows
/
totalBlocks
)
:
0
;
avg
=
totalBlocks
>
0
?
(
int64_t
)(
totalRows
/
totalBlocks
)
:
0
;
min
=
totalBlocks
>
0
?
pTableBlockDist
->
minRows
:
0
;
max
=
totalBlocks
>
0
?
pTableBlockDist
->
maxRows
:
0
;
...
...
@@ -4353,33 +4353,32 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char *result) {
double
percents
[]
=
{
0
.
05
,
0
.
10
,
0
.
20
,
0
.
30
,
0
.
40
,
0
.
50
,
0
.
60
,
0
.
70
,
0
.
80
,
0
.
90
,
0
.
95
,
0
.
99
};
int32_t
percentiles
[]
=
{
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
,
-
1
};
assert
(
sizeof
(
percents
)
/
sizeof
(
double
)
==
sizeof
(
percentiles
)
/
sizeof
(
int32_t
));
getPercentiles
(
pTableBlockDist
,
totalBlocks
,
sizeof
(
percents
)
/
sizeof
(
double
),
percents
,
percentiles
);
assert
(
sizeof
(
percents
)
/
sizeof
(
double
)
==
sizeof
(
percentiles
)
/
sizeof
(
int32_t
));
getPercentiles
(
pTableBlockDist
,
totalBlocks
,
sizeof
(
percents
)
/
sizeof
(
double
),
percents
,
percentiles
);
uint64_t
totalLen
=
pTableBlockDist
->
totalSize
;
int32_t
rowSize
=
pTableBlockDist
->
rowSize
;
int32_t
smallBlocks
=
pTableBlockDist
->
numOfSmallBlocks
;
double
compRatio
=
(
totalRows
>
0
)
?
((
double
)(
totalLen
)
/
(
rowSize
*
totalRows
))
:
1
;
double
compRatio
=
(
totalRows
>
0
)
?
((
double
)(
totalLen
)
/
(
rowSize
*
totalRows
))
:
1
;
int
sz
=
sprintf
(
result
+
VARSTR_HEADER_SIZE
,
"summary:
\n\t
"
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]
\n\t
"
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]
\n\t
"
"Min=[%"
PRId64
"(Rows)] Max=[%"
PRId64
"(Rows)] Avg=[%"
PRId64
"(Rows)] Stddev=[%.2f]
\n\t
"
"Rows=[%"
PRIu64
"], Blocks=[%"
PRId64
"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.2f]
\n\t
"
"Min=[%"
PRId64
"(Rows)] Max=[%"
PRId64
"(Rows)] Avg=[%"
PRId64
"(Rows)] Stddev=[%.2f]
\n\t
"
"Rows=[%"
PRIu64
"], Blocks=[%"
PRId64
"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.2f]
\n\t
"
"RowsInMem=[%d]
\n\t
"
,
percentiles
[
0
],
percentiles
[
1
],
percentiles
[
2
],
percentiles
[
3
],
percentiles
[
4
],
percentiles
[
5
],
percentiles
[
6
],
percentiles
[
7
],
percentiles
[
8
],
percentiles
[
9
],
percentiles
[
10
],
percentiles
[
11
],
min
,
max
,
avg
,
stdDev
,
totalRows
,
totalBlocks
,
smallBlocks
,
totalLen
/
1024
.
0
,
compRatio
,
min
,
max
,
avg
,
stdDev
,
totalRows
,
totalBlocks
,
smallBlocks
,
totalLen
/
1024
.
0
,
compRatio
,
pTableBlockDist
->
numOfRowsInMemTable
);
varDataSetLen
(
result
,
sz
);
UNUSED
(
sz
);
}
void
blockinfo_func_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
void
blockinfo_func_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
pDist
->
rowSize
=
(
uint16_t
)
pCtx
->
param
[
0
].
i64
;
generateBlockDistResult
(
pDist
,
pCtx
->
pOutput
);
...
...
@@ -4409,47 +4408,16 @@ void blockinfo_func_finalizer(SQLFunctionCtx *pCtx) {
*/
int32_t
functionCompatList
[]
=
{
// count, sum, avg, min, max, stddev, percentile, apercentile, first, last
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
// last_row,top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_comp
4
,
-
1
,
-
1
,
1
,
1
,
1
,
1
,
1
,
1
,
-
1
,
4
,
-
1
,
-
1
,
1
,
1
,
1
,
1
,
1
,
1
,
-
1
,
// tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, stddev_dst, interp rate irate
1
,
1
,
1
,
1
,
-
1
,
1
,
1
,
1
,
5
,
1
,
1
,
1
,
1
,
1
,
1
,
-
1
,
1
,
1
,
1
,
5
,
1
,
1
,
// tid_tag, derivative, blk_info
6
,
8
,
7
,
6
,
8
,
7
,
};
SAggFunctionInfo
aAggs
[]
=
{
{
SAggFunctionInfo
aAggs
[]
=
{{
// 0, count function does not invoke the finalize function
"count"
,
TSDB_FUNC_COUNT
,
...
...
@@ -4794,7 +4762,7 @@ SAggFunctionInfo aAggs[] = {
"interp"
,
TSDB_FUNC_INTERP
,
TSDB_FUNC_INTERP
,
TSDB_FUNCSTATE_SO
|
TSDB_FUNCSTATE_OF
|
TSDB_FUNCSTATE_STABLE
|
TSDB_FUNCSTATE_NEED_TS
,
TSDB_FUNCSTATE_SO
|
TSDB_FUNCSTATE_OF
|
TSDB_FUNCSTATE_STABLE
|
TSDB_FUNCSTATE_NEED_TS
,
function_setup
,
interp_function
,
doFinalizer
,
...
...
@@ -4837,8 +4805,7 @@ SAggFunctionInfo aAggs[] = {
noop1
,
dataBlockRequired
,
},
{
// 32
{
//32
"derivative"
,
// return table id and the corresponding tags for join match and subscribe
TSDB_FUNC_DERIVATIVE
,
TSDB_FUNC_INVALID_ID
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录