Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4a40bf25
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4a40bf25
编写于
4月 28, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: interval status do not reset for stream
上级
2aa9ff6d
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
241 addition
and
232 deletion
+241
-232
example/src/tstream.c
example/src/tstream.c
+1
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+240
-231
未找到文件。
example/src/tstream.c
浏览文件 @
4a40bf25
...
...
@@ -25,7 +25,7 @@ int32_t init_env() {
return
-
1
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups
2
"
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups
1
"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
4a40bf25
...
...
@@ -36,27 +36,33 @@ typedef struct SAvgRes {
typedef
struct
STopBotResItem
{
SVariant
v
;
uint64_t
uid
;
// it is a table uid, used to extract tag data during building of the final result for the tag data
uint64_t
uid
;
// it is a table uid, used to extract tag data during building of the final result for the tag data
struct
{
int32_t
pageId
;
int32_t
offset
;
}
tuplePos
;
// tuple data of this chosen row
int32_t
pageId
;
int32_t
offset
;
}
tuplePos
;
// tuple data of this chosen row
}
STopBotResItem
;
typedef
struct
STopBotRes
{
STopBotResItem
*
pItems
;
STopBotResItem
*
pItems
;
}
STopBotRes
;
typedef
struct
SStddevRes
{
double
result
;
int64_t
count
;
union
{
double
quadraticDSum
;
int64_t
quadraticISum
;};
union
{
double
dsum
;
int64_t
isum
;};
union
{
double
quadraticDSum
;
int64_t
quadraticISum
;
};
union
{
double
dsum
;
int64_t
isum
;
};
}
SStddevRes
;
typedef
struct
SPercentileInfo
{
double
result
;
tMemBucket
*
pMemBucket
;
tMemBucket
*
pMemBucket
;
int32_t
stage
;
double
minval
;
double
maxval
;
...
...
@@ -64,19 +70,22 @@ typedef struct SPercentileInfo {
}
SPercentileInfo
;
typedef
struct
SDiffInfo
{
bool
hasPrev
;
bool
includeNull
;
bool
ignoreNegative
;
bool
firstOutput
;
union
{
int64_t
i64
;
double
d64
;}
prev
;
bool
hasPrev
;
bool
includeNull
;
bool
ignoreNegative
;
bool
firstOutput
;
union
{
int64_t
i64
;
double
d64
;
}
prev
;
}
SDiffInfo
;
#define SET_VAL(_info, numOfElem, res)
\
do {
\
if ((numOfElem) <= 0) {
\
break;
\
}
\
(_info)->numOfRes = (res);
\
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
break; \
} \
(_info)->numOfRes = (res); \
} while (0)
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
...
...
@@ -85,7 +94,7 @@ typedef struct SDiffInfo {
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx
*
__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
SqlFunctionCtx
*
__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
...
...
@@ -101,7 +110,7 @@ typedef struct SDiffInfo {
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t
*d = (_t *)((_col)->pData);
\
_t
* d = (_t*)((_col)->pData);
\
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
...
...
@@ -111,8 +120,7 @@ typedef struct SDiffInfo {
} \
} while (0)
bool
functionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
bool
functionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
pResultInfo
->
initialized
)
{
return
false
;
}
...
...
@@ -126,12 +134,12 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
}
int32_t
functionFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
isNullRes
=
(
pResInfo
->
numOfRes
==
0
)
?
1
:
0
;
cleanupResultRowEntry
(
pResInfo
);
pResInfo
->
isNullRes
=
(
pResInfo
->
numOfRes
==
0
)
?
1
:
0
;
/*cleanupResultRowEntry(pResInfo);*/
char
*
in
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
colDataAppend
(
pCol
,
pBlock
->
info
.
rows
,
in
,
pResInfo
->
isNullRes
);
...
...
@@ -140,11 +148,11 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
}
int32_t
functionFinalizeWithResultBuf
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
char
*
finalResult
)
{
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
isNullRes
=
(
pResInfo
->
numOfRes
==
0
)
?
1
:
0
;
pResInfo
->
isNullRes
=
(
pResInfo
->
numOfRes
==
0
)
?
1
:
0
;
cleanupResultRowEntry
(
pResInfo
);
char
*
in
=
finalResult
;
...
...
@@ -170,7 +178,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
* count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/
int32_t
countFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
countFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
/*
...
...
@@ -179,7 +187,7 @@ int32_t countFunction(SqlFunctionCtx *pCtx) {
* 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false;
*/
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
if
(
pInput
->
colDataAggIsSet
&&
pInput
->
totalRows
==
pInput
->
numOfRows
)
{
numOfElem
=
pInput
->
numOfRows
-
pInput
->
pColumnDataAgg
[
0
]
->
numOfNull
;
ASSERT
(
numOfElem
>=
0
);
...
...
@@ -192,14 +200,15 @@ int32_t countFunction(SqlFunctionCtx *pCtx) {
numOfElem
+=
1
;
}
}
else
{
//when counting on the primary time stamp column and no statistics data is presented, use the size value directly.
// when counting on the primary time stamp column and no statistics data is presented, use the size value
// directly.
numOfElem
=
pInput
->
numOfRows
;
}
}
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
*
((
int64_t
*
)
buf
)
+=
numOfElem
;
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
*
((
int64_t
*
)
buf
)
+=
numOfElem
;
SET_VAL
(
pResInfo
,
numOfElem
,
1
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -207,7 +216,7 @@ int32_t countFunction(SqlFunctionCtx *pCtx) {
#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \
do { \
_t
*d = (_t *)(_col->pData);
\
_t
* d = (_t*)(_col->pData);
\
for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
...
...
@@ -217,13 +226,13 @@ int32_t countFunction(SqlFunctionCtx *pCtx) {
} \
} while (0)
int32_t
sumFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
sumFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnDataAgg
*
pAgg
=
pInput
->
pColumnDataAgg
[
0
];
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SColumnDataAgg
*
pAgg
=
pInput
->
pColumnDataAgg
[
0
];
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SSumRes
*
pSumRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
...
...
@@ -241,7 +250,7 @@ int32_t sumFunction(SqlFunctionCtx *pCtx) {
}
else
{
// computing based on the true data block
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
int32_t
start
=
pInput
->
startRowIndex
;
int32_t
start
=
pInput
->
startRowIndex
;
int32_t
numOfRows
=
pInput
->
numOfRows
;
if
(
IS_SIGNED_NUMERIC_TYPE
(
type
)
||
type
==
TSDB_DATA_TYPE_BOOL
)
{
...
...
@@ -286,7 +295,7 @@ bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return
true
;
}
bool
avgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
bool
avgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
...
...
@@ -313,21 +322,21 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
switch
(
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
{
int8_t
*
plist
=
(
int8_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pAvgRes
->
count
+=
1
;
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
int8_t
*
plist
=
(
int8_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
break
;
numOfElem
+=
1
;
pAvgRes
->
count
+=
1
;
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
int16_t
*
plist
=
(
int16_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
...
...
@@ -409,22 +418,22 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
int32_t
avgFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SAvgRes
*
pAvgRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SAvgRes
*
pAvgRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
if
(
IS_INTEGER_TYPE
(
type
))
{
pAvgRes
->
result
=
pAvgRes
->
sum
.
isum
/
((
double
)
pAvgRes
->
count
);
pAvgRes
->
result
=
pAvgRes
->
sum
.
isum
/
((
double
)
pAvgRes
->
count
);
}
else
{
pAvgRes
->
result
=
pAvgRes
->
sum
.
dsum
/
((
double
)
pAvgRes
->
count
);
pAvgRes
->
result
=
pAvgRes
->
sum
.
dsum
/
((
double
)
pAvgRes
->
count
);
}
return
functionFinalize
(
pCtx
,
pBlock
);
}
EFuncDataRequired
statisDataRequired
(
SFunctionNode
*
pFunc
,
STimeWindow
*
pTimeWindow
){
EFuncDataRequired
statisDataRequired
(
SFunctionNode
*
pFunc
,
STimeWindow
*
pTimeWindow
)
{
return
FUNC_DATA_REQUIRED_STATIS_LOAD
;
}
bool
maxFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
bool
maxFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
...
...
@@ -432,34 +441,34 @@ bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
switch
(
pCtx
->
resDataInfo
.
type
)
{
case
TSDB_DATA_TYPE_INT
:
*
((
int32_t
*
)
buf
)
=
INT32_MIN
;
*
((
int32_t
*
)
buf
)
=
INT32_MIN
;
break
;
case
TSDB_DATA_TYPE_UINT
:
*
((
uint32_t
*
)
buf
)
=
0
;
*
((
uint32_t
*
)
buf
)
=
0
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
*
((
float
*
)
buf
)
=
-
FLT_MAX
;
*
((
float
*
)
buf
)
=
-
FLT_MAX
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
SET_DOUBLE_VAL
(((
double
*
)
buf
),
-
DBL_MAX
);
SET_DOUBLE_VAL
(((
double
*
)
buf
),
-
DBL_MAX
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
*
((
int64_t
*
)
buf
)
=
INT64_MIN
;
*
((
int64_t
*
)
buf
)
=
INT64_MIN
;
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
*
((
uint64_t
*
)
buf
)
=
0
;
*
((
uint64_t
*
)
buf
)
=
0
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
*
((
int16_t
*
)
buf
)
=
INT16_MIN
;
*
((
int16_t
*
)
buf
)
=
INT16_MIN
;
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
*
((
uint16_t
*
)
buf
)
=
0
;
*
((
uint16_t
*
)
buf
)
=
0
;
break
;
case
TSDB_DATA_TYPE_TINYINT
:
*
((
int8_t
*
)
buf
)
=
INT8_MIN
;
*
((
int8_t
*
)
buf
)
=
INT8_MIN
;
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
*
((
uint8_t
*
)
buf
)
=
0
;
*
((
uint8_t
*
)
buf
)
=
0
;
break
;
case
TSDB_DATA_TYPE_BOOL
:
*
((
int8_t
*
)
buf
)
=
0
;
...
...
@@ -470,7 +479,7 @@ bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
return
true
;
}
bool
minFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
bool
minFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
// not initialized since it has been initialized
}
...
...
@@ -478,34 +487,34 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
switch
(
pCtx
->
resDataInfo
.
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
*
((
int8_t
*
)
buf
)
=
INT8_MAX
;
*
((
int8_t
*
)
buf
)
=
INT8_MAX
;
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
*
(
uint8_t
*
)
buf
=
UINT8_MAX
;
*
(
uint8_t
*
)
buf
=
UINT8_MAX
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
*
((
int16_t
*
)
buf
)
=
INT16_MAX
;
*
((
int16_t
*
)
buf
)
=
INT16_MAX
;
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
*
((
uint16_t
*
)
buf
)
=
UINT16_MAX
;
*
((
uint16_t
*
)
buf
)
=
UINT16_MAX
;
break
;
case
TSDB_DATA_TYPE_INT
:
*
((
int32_t
*
)
buf
)
=
INT32_MAX
;
*
((
int32_t
*
)
buf
)
=
INT32_MAX
;
break
;
case
TSDB_DATA_TYPE_UINT
:
*
((
uint32_t
*
)
buf
)
=
UINT32_MAX
;
*
((
uint32_t
*
)
buf
)
=
UINT32_MAX
;
break
;
case
TSDB_DATA_TYPE_BIGINT
:
*
((
int64_t
*
)
buf
)
=
INT64_MAX
;
*
((
int64_t
*
)
buf
)
=
INT64_MAX
;
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
*
((
uint64_t
*
)
buf
)
=
UINT64_MAX
;
*
((
uint64_t
*
)
buf
)
=
UINT64_MAX
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
*
((
float
*
)
buf
)
=
FLT_MAX
;
*
((
float
*
)
buf
)
=
FLT_MAX
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
SET_DOUBLE_VAL
(((
double
*
)
buf
),
DBL_MAX
);
SET_DOUBLE_VAL
(((
double
*
)
buf
),
DBL_MAX
);
break
;
case
TSDB_DATA_TYPE_BOOL
:
*
((
int8_t
*
)
buf
)
=
1
;
...
...
@@ -528,21 +537,21 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx
*
__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
SqlFunctionCtx
*
__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts)
\
do {
\
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i];
\
if (__ctx->functionId == FUNCTION_TS_DUMMY) {
\
__ctx->tag.i = (ts);
\
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
\
}
\
__ctx->fpSet.process(__ctx);
\
}
\
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
...
...
@@ -556,7 +565,7 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t
*d = (_t *)((_col)->pData);
\
_t
* d = (_t*)((_col)->pData);
\
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
...
...
@@ -566,17 +575,17 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
} \
} while (0)
int32_t
doMinMaxHelper
(
SqlFunctionCtx
*
pCtx
,
int32_t
isMinFunc
)
{
int32_t
doMinMaxHelper
(
SqlFunctionCtx
*
pCtx
,
int32_t
isMinFunc
)
{
int32_t
numOfElems
=
0
;
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnDataAgg
*
pAgg
=
pInput
->
pColumnDataAgg
[
0
];
SColumnDataAgg
*
pAgg
=
pInput
->
pColumnDataAgg
[
0
];
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
int32_t
type
=
pCol
->
info
.
type
;
int32_t
type
=
pCol
->
info
.
type
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
// data in current data block are qualified to the query
if
(
pInput
->
colDataAggIsSet
)
{
...
...
@@ -591,15 +600,15 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
int16_t
index
=
0
;
if
(
isMinFunc
)
{
tval
=
&
pInput
->
pColumnDataAgg
[
0
]
->
min
;
tval
=
&
pInput
->
pColumnDataAgg
[
0
]
->
min
;
index
=
pInput
->
pColumnDataAgg
[
0
]
->
minIndex
;
}
else
{
tval
=
&
pInput
->
pColumnDataAgg
[
0
]
->
max
;
tval
=
&
pInput
->
pColumnDataAgg
[
0
]
->
max
;
index
=
pInput
->
pColumnDataAgg
[
0
]
->
maxIndex
;
}
// the index is the original position, not the relative position
TSKEY
key
=
(
pCtx
->
ptsList
!=
NULL
)
?
pCtx
->
ptsList
[
index
]
:
TSKEY_INITIAL_VAL
;
TSKEY
key
=
(
pCtx
->
ptsList
!=
NULL
)
?
pCtx
->
ptsList
[
index
]
:
TSKEY_INITIAL_VAL
;
if
(
IS_SIGNED_NUMERIC_TYPE
(
type
))
{
int64_t
prev
=
0
;
...
...
@@ -607,7 +616,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
int64_t
val
=
GET_INT64_VAL
(
tval
);
if
((
prev
<
val
)
^
isMinFunc
)
{
*
(
int64_t
*
)
buf
=
val
;
*
(
int64_t
*
)
buf
=
val
;
for
(
int32_t
i
=
0
;
i
<
(
pCtx
)
->
subsidiaries
.
num
;
++
i
)
{
SqlFunctionCtx
*
__ctx
=
pCtx
->
subsidiaries
.
pCtx
[
i
];
if
(
__ctx
->
functionId
==
FUNCTION_TS_DUMMY
)
{
// TODO refactor
...
...
@@ -624,7 +633,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
uint64_t
val
=
GET_UINT64_VAL
(
tval
);
if
((
prev
<
val
)
^
isMinFunc
)
{
*
(
uint64_t
*
)
buf
=
val
;
*
(
uint64_t
*
)
buf
=
val
;
for
(
int32_t
i
=
0
;
i
<
(
pCtx
)
->
subsidiaries
.
num
;
++
i
)
{
SqlFunctionCtx
*
__ctx
=
pCtx
->
subsidiaries
.
pCtx
[
i
];
if
(
__ctx
->
functionId
==
FUNCTION_TS_DUMMY
)
{
// TODO refactor
...
...
@@ -636,11 +645,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
}
}
}
else
if
(
type
==
TSDB_DATA_TYPE_DOUBLE
)
{
double
val
=
GET_DOUBLE_VAL
(
tval
);
UPDATE_DATA
(
pCtx
,
*
(
double
*
)
buf
,
val
,
numOfElems
,
isMinFunc
,
key
);
double
val
=
GET_DOUBLE_VAL
(
tval
);
UPDATE_DATA
(
pCtx
,
*
(
double
*
)
buf
,
val
,
numOfElems
,
isMinFunc
,
key
);
}
else
if
(
type
==
TSDB_DATA_TYPE_FLOAT
)
{
double
val
=
GET_DOUBLE_VAL
(
tval
);
UPDATE_DATA
(
pCtx
,
*
(
float
*
)
buf
,
val
,
numOfElems
,
isMinFunc
,
key
);
UPDATE_DATA
(
pCtx
,
*
(
float
*
)
buf
,
val
,
numOfElems
,
isMinFunc
,
key
);
}
return
numOfElems
;
...
...
@@ -653,10 +662,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
if
(
type
==
TSDB_DATA_TYPE_TINYINT
||
type
==
TSDB_DATA_TYPE_BOOL
)
{
LOOPCHECK_N
(
*
(
int8_t
*
)
buf
,
pCol
,
pCtx
,
int8_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
}
else
if
(
type
==
TSDB_DATA_TYPE_SMALLINT
)
{
LOOPCHECK_N
(
*
(
int16_t
*
)
buf
,
pCol
,
pCtx
,
int16_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
LOOPCHECK_N
(
*
(
int16_t
*
)
buf
,
pCol
,
pCtx
,
int16_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
}
else
if
(
type
==
TSDB_DATA_TYPE_INT
)
{
int32_t
*
pData
=
(
int32_t
*
)
pCol
->
pData
;
int32_t
*
val
=
(
int32_t
*
)
buf
;
int32_t
*
pData
=
(
int32_t
*
)
pCol
->
pData
;
int32_t
*
val
=
(
int32_t
*
)
buf
;
for
(
int32_t
i
=
start
;
i
<
start
+
numOfRows
;
++
i
)
{
if
((
pCol
->
hasNull
)
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
...
...
@@ -665,7 +674,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
if
((
*
val
<
pData
[
i
])
^
isMinFunc
)
{
*
val
=
pData
[
i
];
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
DO_UPDATE_SUBSID_RES
(
pCtx
,
ts
);
}
...
...
@@ -676,34 +685,34 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
qDebug
(
"max value updated:%d"
,
*
retVal
);
#endif
}
else
if
(
type
==
TSDB_DATA_TYPE_BIGINT
)
{
LOOPCHECK_N
(
*
(
int64_t
*
)
buf
,
pCol
,
pCtx
,
int64_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
LOOPCHECK_N
(
*
(
int64_t
*
)
buf
,
pCol
,
pCtx
,
int64_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
}
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
))
{
if
(
type
==
TSDB_DATA_TYPE_UTINYINT
)
{
LOOPCHECK_N
(
*
(
uint8_t
*
)
buf
,
pCol
,
pCtx
,
uint8_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
LOOPCHECK_N
(
*
(
uint8_t
*
)
buf
,
pCol
,
pCtx
,
uint8_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
}
else
if
(
type
==
TSDB_DATA_TYPE_USMALLINT
)
{
LOOPCHECK_N
(
*
(
uint16_t
*
)
buf
,
pCol
,
pCtx
,
uint16_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
LOOPCHECK_N
(
*
(
uint16_t
*
)
buf
,
pCol
,
pCtx
,
uint16_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
}
else
if
(
type
==
TSDB_DATA_TYPE_UINT
)
{
LOOPCHECK_N
(
*
(
uint32_t
*
)
buf
,
pCol
,
pCtx
,
uint32_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
LOOPCHECK_N
(
*
(
uint32_t
*
)
buf
,
pCol
,
pCtx
,
uint32_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
}
else
if
(
type
==
TSDB_DATA_TYPE_UBIGINT
)
{
LOOPCHECK_N
(
*
(
uint64_t
*
)
buf
,
pCol
,
pCtx
,
uint64_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
LOOPCHECK_N
(
*
(
uint64_t
*
)
buf
,
pCol
,
pCtx
,
uint64_t
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
}
}
else
if
(
type
==
TSDB_DATA_TYPE_DOUBLE
)
{
LOOPCHECK_N
(
*
(
double
*
)
buf
,
pCol
,
pCtx
,
double
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
LOOPCHECK_N
(
*
(
double
*
)
buf
,
pCol
,
pCtx
,
double
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
}
else
if
(
type
==
TSDB_DATA_TYPE_FLOAT
)
{
LOOPCHECK_N
(
*
(
float
*
)
buf
,
pCol
,
pCtx
,
float
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
LOOPCHECK_N
(
*
(
float
*
)
buf
,
pCol
,
pCtx
,
float
,
numOfRows
,
start
,
isMinFunc
,
numOfElems
);
}
return
numOfElems
;
}
int32_t
minFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
minFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElems
=
doMinMaxHelper
(
pCtx
,
1
);
SET_VAL
(
GET_RES_INFO
(
pCtx
),
numOfElems
,
1
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
maxFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
maxFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElems
=
doMinMaxHelper
(
pCtx
,
0
);
SET_VAL
(
GET_RES_INFO
(
pCtx
),
numOfElems
,
1
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -714,7 +723,7 @@ bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return
true
;
}
bool
stddevFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
bool
stddevFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
...
...
@@ -741,22 +750,22 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
switch
(
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
{
int8_t
*
plist
=
(
int8_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
start
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
isum
+=
plist
[
i
];
pStddevRes
->
quadraticISum
+=
plist
[
i
]
*
plist
[
i
];
int8_t
*
plist
=
(
int8_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
start
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
break
;
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
isum
+=
plist
[
i
];
pStddevRes
->
quadraticISum
+=
plist
[
i
]
*
plist
[
i
];
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
int16_t
*
plist
=
(
int16_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
...
...
@@ -843,15 +852,15 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
int32_t
stddevFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SStddevRes
*
pStddevRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
double
avg
;
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SStddevRes
*
pStddevRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
double
avg
;
if
(
IS_INTEGER_TYPE
(
type
))
{
avg
=
pStddevRes
->
isum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticISum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
avg
=
pStddevRes
->
isum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticISum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
}
else
{
avg
=
pStddevRes
->
dsum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticDSum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
avg
=
pStddevRes
->
dsum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticDSum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
}
return
functionFinalize
(
pCtx
,
pBlock
);
...
...
@@ -862,13 +871,13 @@ bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return
true
;
}
bool
percentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
bool
percentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
// in the first round, get the min-max value of all involved data
SPercentileInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
SPercentileInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
SET_DOUBLE_VAL
(
&
pInfo
->
minval
,
DBL_MAX
);
SET_DOUBLE_VAL
(
&
pInfo
->
maxval
,
-
DBL_MAX
);
pInfo
->
numOfElems
=
0
;
...
...
@@ -876,17 +885,17 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
return
true
;
}
int32_t
percentileFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
notNullElems
=
0
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
int32_t
percentileFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
notNullElems
=
0
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnDataAgg
*
pAgg
=
pInput
->
pColumnDataAgg
[
0
];
SColumnDataAgg
*
pAgg
=
pInput
->
pColumnDataAgg
[
0
];
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
int32_t
type
=
pCol
->
info
.
type
;
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
int32_t
type
=
pCol
->
info
.
type
;
SPercentileInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SPercentileInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pCtx
->
currentStage
==
REPEAT_SCAN
&&
pInfo
->
stage
==
0
)
{
pInfo
->
stage
+=
1
;
...
...
@@ -931,7 +940,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
continue
;
}
char
*
data
=
colDataGetData
(
pCol
,
i
);
char
*
data
=
colDataGetData
(
pCol
,
i
);
double
v
=
0
;
GET_TYPED_DATA
(
v
,
double
,
pCtx
->
inputType
,
data
);
...
...
@@ -957,7 +966,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
continue
;
}
char
*
data
=
colDataGetData
(
pCol
,
i
);
char
*
data
=
colDataGetData
(
pCol
,
i
);
notNullElems
+=
1
;
tMemBucketPut
(
pInfo
->
pMemBucket
,
data
,
1
);
...
...
@@ -969,12 +978,12 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
int32_t
percentileFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SVariant
*
pVal
=
&
pCtx
->
param
[
1
].
param
;
double
v
=
pVal
->
nType
==
TSDB_DATA_TYPE_INT
?
pVal
->
i
:
pVal
->
d
;
double
v
=
pVal
->
nType
==
TSDB_DATA_TYPE_INT
?
pVal
->
i
:
pVal
->
d
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
ppInfo
=
(
SPercentileInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
ppInfo
=
(
SPercentileInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
tMemBucket
*
pMemBucket
=
ppInfo
->
pMemBucket
;
tMemBucket
*
pMemBucket
=
ppInfo
->
pMemBucket
;
if
(
pMemBucket
!=
NULL
&&
pMemBucket
->
total
>
0
)
{
// check for null
SET_DOUBLE_VAL
(
&
ppInfo
->
result
,
getPercentile
(
pMemBucket
,
v
));
}
...
...
@@ -994,19 +1003,19 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
return
0
;
}
return
*
(
TSKEY
*
)
colDataGetData
(
pTsColInfo
,
rowIndex
);
return
*
(
TSKEY
*
)
colDataGetData
(
pTsColInfo
,
rowIndex
);
}
// This ordinary first function does not care if current scan is ascending order or descending order scan
// the OPTIMIZED version of first function will only handle the ascending order scan
int32_t
firstFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
firstFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElems
=
0
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
int32_t
bytes
=
pInputCol
->
info
.
bytes
;
...
...
@@ -1016,12 +1025,12 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
return
0
;
}
SColumnDataAgg
*
pColAgg
=
(
pInput
->
colDataAggIsSet
)
?
pInput
->
pColumnDataAgg
[
0
]
:
NULL
;
SColumnDataAgg
*
pColAgg
=
(
pInput
->
colDataAggIsSet
)
?
pInput
->
pColumnDataAgg
[
0
]
:
NULL
;
TSKEY
startKey
=
getRowPTs
(
pInput
->
pPTS
,
0
);
TSKEY
endKey
=
getRowPTs
(
pInput
->
pPTS
,
pInput
->
totalRows
-
1
);
int32_t
blockDataOrder
=
(
startKey
<=
endKey
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
blockDataOrder
=
(
startKey
<=
endKey
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
if
(
blockDataOrder
==
TSDB_ORDER_ASC
)
{
// filter according to current result firstly
...
...
@@ -1045,7 +1054,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
buf
+
bytes
)
>
cts
)
{
memcpy
(
buf
,
data
,
bytes
);
*
(
TSKEY
*
)(
buf
+
bytes
)
=
cts
;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo
->
numOfRes
=
1
;
break
;
...
...
@@ -1074,7 +1083,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
buf
+
bytes
)
>
cts
)
{
memcpy
(
buf
,
data
,
bytes
);
*
(
TSKEY
*
)(
buf
+
bytes
)
=
cts
;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo
->
numOfRes
=
1
;
break
;
}
...
...
@@ -1085,14 +1094,14 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
lastFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
lastFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElems
=
0
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
char
*
buf
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
int32_t
bytes
=
pInputCol
->
info
.
bytes
;
...
...
@@ -1102,12 +1111,12 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
return
0
;
}
SColumnDataAgg
*
pColAgg
=
(
pInput
->
colDataAggIsSet
)
?
pInput
->
pColumnDataAgg
[
0
]
:
NULL
;
SColumnDataAgg
*
pColAgg
=
(
pInput
->
colDataAggIsSet
)
?
pInput
->
pColumnDataAgg
[
0
]
:
NULL
;
TSKEY
startKey
=
getRowPTs
(
pInput
->
pPTS
,
0
);
TSKEY
endKey
=
getRowPTs
(
pInput
->
pPTS
,
pInput
->
totalRows
-
1
);
int32_t
blockDataOrder
=
(
startKey
<=
endKey
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
blockDataOrder
=
(
startKey
<=
endKey
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
if
(
blockDataOrder
==
TSDB_ORDER_ASC
)
{
for
(
int32_t
i
=
pInput
->
numOfRows
+
pInput
->
startRowIndex
-
1
;
i
>=
pInput
->
startRowIndex
;
--
i
)
{
...
...
@@ -1141,7 +1150,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
memcpy
(
buf
,
data
,
bytes
);
*
(
TSKEY
*
)(
buf
+
bytes
)
=
cts
;
pResInfo
->
numOfRes
=
1
;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
break
;
}
...
...
@@ -1156,43 +1165,42 @@ bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return
true
;
}
bool
diffFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
)
{
bool
diffFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResInfo
))
{
return
false
;
}
SDiffInfo
*
pDiffInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pDiffInfo
->
hasPrev
=
false
;
pDiffInfo
->
hasPrev
=
false
;
pDiffInfo
->
prev
.
i64
=
0
;
pDiffInfo
->
ignoreNegative
=
false
;
// TODO set correct param
pDiffInfo
->
ignoreNegative
=
false
;
// TODO set correct param
pDiffInfo
->
includeNull
=
false
;
pDiffInfo
->
firstOutput
=
false
;
return
true
;
}
int32_t
diffFunction
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SDiffInfo
*
pDiffInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
int32_t
diffFunction
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SDiffInfo
*
pDiffInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
bool
isFirstBlock
=
(
pDiffInfo
->
hasPrev
==
false
);
bool
isFirstBlock
=
(
pDiffInfo
->
hasPrev
==
false
);
int32_t
numOfElems
=
0
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pCtx
->
order
);
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
SColumnInfoData
*
pTsOutput
=
pCtx
->
pTsOutput
;
TSKEY
*
tsList
=
(
int64_t
*
)
pInput
->
pPTS
->
pData
;
TSKEY
*
tsList
=
(
int64_t
*
)
pInput
->
pPTS
->
pData
;
int32_t
startOffset
=
pCtx
->
offset
;
switch
(
pInputCol
->
info
.
type
)
{
case
TSDB_DATA_TYPE_INT
:
{
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
i
+=
step
)
{
int32_t
pos
=
startOffset
+
(
isFirstBlock
?
(
numOfElems
-
1
)
:
numOfElems
);
int32_t
pos
=
startOffset
+
(
isFirstBlock
?
(
numOfElems
-
1
)
:
numOfElems
);
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
if
(
pDiffInfo
->
includeNull
)
{
colDataSetNull_f
(
pOutput
->
nullbitmap
,
pos
);
...
...
@@ -1205,7 +1213,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
continue
;
}
int32_t
v
=
*
(
int32_t
*
)
colDataGetData
(
pInputCol
,
i
);
int32_t
v
=
*
(
int32_t
*
)
colDataGetData
(
pInputCol
,
i
);
if
(
pDiffInfo
->
hasPrev
)
{
int32_t
delta
=
(
int32_t
)(
v
-
pDiffInfo
->
prev
.
i64
);
// direct previous may be null
if
(
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
{
...
...
@@ -1220,14 +1228,14 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
}
pDiffInfo
->
prev
.
i64
=
v
;
pDiffInfo
->
hasPrev
=
true
;
pDiffInfo
->
hasPrev
=
true
;
numOfElems
++
;
}
break
;
}
case
TSDB_DATA_TYPE_BIGINT
:
{
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
i
+=
step
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
continue
;
...
...
@@ -1235,17 +1243,17 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
int32_t
v
=
0
;
if
(
pDiffInfo
->
hasPrev
)
{
v
=
*
(
int64_t
*
)
colDataGetData
(
pInputCol
,
i
);
v
=
*
(
int64_t
*
)
colDataGetData
(
pInputCol
,
i
);
int64_t
delta
=
(
int64_t
)(
v
-
pDiffInfo
->
prev
.
i64
);
// direct previous may be null
if
(
pDiffInfo
->
ignoreNegative
)
{
continue
;
}
// *(pOutput++) = delta;
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
//
// pOutput += 1;
// pTimestamp += 1;
// *(pOutput++) = delta;
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
//
// pOutput += 1;
// pTimestamp += 1;
}
pDiffInfo
->
prev
.
i64
=
v
;
...
...
@@ -1359,7 +1367,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
#endif
default:
break
;
// qError("error input type");
// qError("error input type");
}
// initial value is not set yet
...
...
@@ -1371,12 +1379,12 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
assert
(
pCtx
->
hasNull
);
return
0
;
}
else
{
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
// }
// }
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
// }
// }
int32_t
forwardStep
=
(
isFirstBlock
)
?
numOfElems
-
1
:
numOfElems
;
return
forwardStep
;
...
...
@@ -1384,35 +1392,35 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
}
bool
getTopBotFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
SValueNode
*
pkNode
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
SValueNode
*
pkNode
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
pEnv
->
calcMemSize
=
sizeof
(
STopBotRes
)
+
pkNode
->
datum
.
i
*
sizeof
(
STopBotResItem
);
return
true
;
}
static
STopBotRes
*
getTopBotOutputInfo
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STopBotRes
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pRes
->
pItems
=
(
STopBotResItem
*
)((
char
*
)
pRes
+
sizeof
(
STopBotRes
));
static
STopBotRes
*
getTopBotOutputInfo
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STopBotRes
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pRes
->
pItems
=
(
STopBotResItem
*
)((
char
*
)
pRes
+
sizeof
(
STopBotRes
));
return
pRes
;
}
static
void
doAddIntoResult
(
SqlFunctionCtx
*
pCtx
,
void
*
pData
,
int32_t
rowIndex
,
SSDataBlock
*
pSrcBlock
,
uint
16_t
type
,
uint
64_t
uid
,
SResultRowEntryInfo
*
pEntryInfo
);
static
void
doAddIntoResult
(
SqlFunctionCtx
*
pCtx
,
void
*
pData
,
int32_t
rowIndex
,
SSDataBlock
*
pSrcBlock
,
uint16_t
type
,
uint64_t
uid
,
SResultRowEntryInfo
*
pEntryInfo
);
static
void
saveTupleData
(
SqlFunctionCtx
*
pCtx
,
int32_t
rowIndex
,
const
SSDataBlock
*
pSrcBlock
,
STopBotResItem
*
pItem
);
static
void
copyTupleData
(
SqlFunctionCtx
*
pCtx
,
int32_t
rowIndex
,
const
SSDataBlock
*
pSrcBlock
,
STopBotResItem
*
pItem
);
int32_t
topFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElems
=
0
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
int32_t
topFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElems
=
0
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
// buildTopBotStruct(pRes, pCtx);
// }
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
// buildTopBotStruct(pRes, pCtx);
// }
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
...
...
@@ -1432,11 +1440,11 @@ int32_t topFunction(SqlFunctionCtx *pCtx) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
topBotResComparFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
uint16_t
type
=
*
(
uint16_t
*
)
param
;
static
int32_t
topBotResComparFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
uint16_t
type
=
*
(
uint16_t
*
)
param
;
STopBotResItem
*
val1
=
(
STopBotResItem
*
)
p1
;
STopBotResItem
*
val2
=
(
STopBotResItem
*
)
p2
;
STopBotResItem
*
val1
=
(
STopBotResItem
*
)
p1
;
STopBotResItem
*
val2
=
(
STopBotResItem
*
)
p2
;
if
(
IS_SIGNED_NUMERIC_TYPE
(
type
))
{
if
(
val1
->
v
.
i
==
val2
->
v
.
i
)
{
...
...
@@ -1461,19 +1469,19 @@ static int32_t topBotResComparFn(const void *p1, const void *p2, const void *par
void
doAddIntoResult
(
SqlFunctionCtx
*
pCtx
,
void
*
pData
,
int32_t
rowIndex
,
SSDataBlock
*
pSrcBlock
,
uint16_t
type
,
uint64_t
uid
,
SResultRowEntryInfo
*
pEntryInfo
)
{
STopBotRes
*
pRes
=
getTopBotOutputInfo
(
pCtx
);
int32_t
maxSize
=
pCtx
->
param
[
1
].
param
.
i
;
STopBotRes
*
pRes
=
getTopBotOutputInfo
(
pCtx
);
int32_t
maxSize
=
pCtx
->
param
[
1
].
param
.
i
;
SVariant
val
=
{
0
};
taosVariantCreateFromBinary
(
&
val
,
pData
,
tDataTypes
[
type
].
bytes
,
type
);
STopBotResItem
*
pItems
=
pRes
->
pItems
;
STopBotResItem
*
pItems
=
pRes
->
pItems
;
assert
(
pItems
!=
NULL
);
// not full yet
if
(
pEntryInfo
->
numOfRes
<
maxSize
)
{
STopBotResItem
*
pItem
=
&
pItems
[
pEntryInfo
->
numOfRes
];
pItem
->
v
=
val
;
pItem
->
v
=
val
;
pItem
->
uid
=
uid
;
// save the data of this tuple
...
...
@@ -1481,20 +1489,21 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// allocate the buffer and keep the data of this row into the new allocated buffer
pEntryInfo
->
numOfRes
++
;
taosheapsort
((
void
*
)
pItems
,
sizeof
(
STopBotResItem
),
pEntryInfo
->
numOfRes
,
(
const
void
*
)
&
type
,
topBotResComparFn
,
false
);
}
else
{
// replace the minimum value in the result
taosheapsort
((
void
*
)
pItems
,
sizeof
(
STopBotResItem
),
pEntryInfo
->
numOfRes
,
(
const
void
*
)
&
type
,
topBotResComparFn
,
false
);
}
else
{
// replace the minimum value in the result
if
((
IS_SIGNED_NUMERIC_TYPE
(
type
)
&&
val
.
i
>
pItems
[
0
].
v
.
i
)
||
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
)
&&
val
.
u
>
pItems
[
0
].
v
.
u
)
||
(
IS_FLOAT_TYPE
(
type
)
&&
val
.
d
>
pItems
[
0
].
v
.
d
))
{
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
)
&&
val
.
u
>
pItems
[
0
].
v
.
u
)
||
(
IS_FLOAT_TYPE
(
type
)
&&
val
.
d
>
pItems
[
0
].
v
.
d
))
{
// replace the old data and the coresponding tuple data
STopBotResItem
*
pItem
=
&
pItems
[
0
];
pItem
->
v
=
val
;
pItem
->
v
=
val
;
pItem
->
uid
=
uid
;
// save the data of this tuple by over writing the old data
copyTupleData
(
pCtx
,
rowIndex
,
pSrcBlock
,
pItem
);
taosheapadjust
((
void
*
)
pItems
,
sizeof
(
STopBotResItem
),
0
,
pEntryInfo
->
numOfRes
-
1
,
(
const
void
*
)
&
type
,
topBotResComparFn
,
NULL
,
false
);
taosheapadjust
((
void
*
)
pItems
,
sizeof
(
STopBotResItem
),
0
,
pEntryInfo
->
numOfRes
-
1
,
(
const
void
*
)
&
type
,
topBotResComparFn
,
NULL
,
false
);
}
}
}
...
...
@@ -1553,7 +1562,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
char
*
pStart
=
(
char
*
)(
nullList
+
pSrcBlock
->
info
.
numOfCols
*
sizeof
(
bool
));
int32_t
offset
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSrcBlock
->
info
.
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSrcBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
i
);
if
((
nullList
[
i
]
=
colDataIsNull_s
(
pCol
,
rowIndex
))
==
true
)
{
continue
;
...
...
@@ -1574,17 +1583,17 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
}
int32_t
topBotFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
pCtx
);
STopBotRes
*
pRes
=
GET_ROWCELL_INTERBUF
(
pEntryInfo
);
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
pCtx
);
STopBotRes
*
pRes
=
GET_ROWCELL_INTERBUF
(
pEntryInfo
);
pEntryInfo
->
complete
=
true
;
int32_t
type
=
pCtx
->
input
.
pData
[
0
]
->
info
.
type
;
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
type
=
pCtx
->
input
.
pData
[
0
]
->
info
.
type
;
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
// todo assign the tag value and the corresponding row data
int32_t
currentRow
=
pBlock
->
info
.
rows
;
switch
(
type
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_INT
:
{
for
(
int32_t
i
=
0
;
i
<
pEntryInfo
->
numOfRes
;
++
i
)
{
STopBotResItem
*
pItem
=
&
pRes
->
pItems
[
i
];
...
...
@@ -1602,12 +1611,12 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
for
(
int32_t
j
=
0
;
j
<
pCtx
->
subsidiaries
.
num
;
++
j
)
{
SqlFunctionCtx
*
pc
=
pCtx
->
subsidiaries
.
pCtx
[
j
];
SFunctParam
*
pFuncParam
=
&
pc
->
pExpr
->
base
.
pParam
[
0
];
int32_t
srcSlotId
=
pFuncParam
->
pCol
->
slotId
;
int32_t
dstSlotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
SFunctParam
*
pFuncParam
=
&
pc
->
pExpr
->
base
.
pParam
[
0
];
int32_t
srcSlotId
=
pFuncParam
->
pCol
->
slotId
;
int32_t
dstSlotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
ps
=
0
;
for
(
int32_t
k
=
0
;
k
<
srcSlotId
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
srcSlotId
;
++
k
)
{
SColumnInfoData
*
pSrcCol
=
taosArrayGet
(
pCtx
->
pSrcBlock
->
pDataBlock
,
k
);
ps
+=
pSrcCol
->
info
.
bytes
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录