Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3d0838d1
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3d0838d1
编写于
2月 25, 2022
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add tail function
上级
f882b496
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
251 addition
and
46 deletion
+251
-46
src/query/inc/qAggMain.h
src/query/inc/qAggMain.h
+2
-1
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+245
-40
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+4
-5
未找到文件。
src/query/inc/qAggMain.h
浏览文件 @
3d0838d1
...
@@ -80,8 +80,9 @@ extern "C" {
...
@@ -80,8 +80,9 @@ extern "C" {
#define TSDB_FUNC_HISTOGRAM 38
#define TSDB_FUNC_HISTOGRAM 38
#define TSDB_FUNC_UNIQUE 39
#define TSDB_FUNC_UNIQUE 39
#define TSDB_FUNC_MODE 40
#define TSDB_FUNC_MODE 40
#define TSDB_FUNC_TAIL 41
#define TSDB_FUNC_MAX_NUM 4
1
#define TSDB_FUNC_MAX_NUM 4
2
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
...
...
src/query/src/qAggMain.c
浏览文件 @
3d0838d1
...
@@ -243,6 +243,17 @@ typedef struct {
...
@@ -243,6 +243,17 @@ typedef struct {
char
res
[];
char
res
[];
}
SModeFuncInfo
;
}
SModeFuncInfo
;
typedef
struct
{
int64_t
timestamp
;
char
data
[];
}
TailUnit
;
typedef
struct
STailInfo
{
int32_t
offset
;
int32_t
num
;
TailUnit
**
res
;
}
STailInfo
;
int32_t
getResultDataInfo
(
int32_t
dataType
,
int32_t
dataBytes
,
int32_t
functionId
,
int32_t
param
,
int16_t
*
type
,
int32_t
getResultDataInfo
(
int32_t
dataType
,
int32_t
dataBytes
,
int32_t
functionId
,
int32_t
param
,
int16_t
*
type
,
int32_t
*
bytes
,
int32_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
,
SUdfInfo
*
pUdfInfo
)
{
int32_t
*
bytes
,
int32_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
,
SUdfInfo
*
pUdfInfo
)
{
if
(
!
isValidDataType
(
dataType
))
{
if
(
!
isValidDataType
(
dataType
))
{
...
@@ -387,17 +398,23 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
...
@@ -387,17 +398,23 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
functionId
==
TSDB_FUNC_MODE
)
{
}
else
if
(
functionId
==
TSDB_FUNC_MODE
)
{
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
type
=
TSDB_DATA_TYPE_BINARY
;
int64_t
size
=
sizeof
(
ModeUnit
)
+
dataBytes
;
int64_t
size
=
sizeof
(
ModeUnit
)
+
dataBytes
;
size
*=
MAX_MODE_INNER_RESULT_ROWS
;
size
*=
MAX_MODE_INNER_RESULT_ROWS
;
size
+=
sizeof
(
SModeFuncInfo
);
size
+=
sizeof
(
SModeFuncInfo
);
if
(
size
>
MAX_MODE_INNER_RESULT_SIZE
){
if
(
size
>
MAX_MODE_INNER_RESULT_SIZE
){
size
=
MAX_MODE_INNER_RESULT_SIZE
;
size
=
MAX_MODE_INNER_RESULT_SIZE
;
}
}
*
bytes
=
(
int32_t
)
size
;
*
bytes
=
(
int32_t
)
size
;
*
interBytes
=
*
bytes
;
*
interBytes
=
*
bytes
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
functionId
==
TSDB_FUNC_TAIL
)
{
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
bytes
=
(
sizeof
(
STailInfo
)
+
(
sizeof
(
TailUnit
)
+
dataBytes
+
POINTER_BYTES
+
extLength
)
*
param
);
*
interBytes
=
*
bytes
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
functionId
==
TSDB_FUNC_SAMPLE
)
{
}
else
if
(
functionId
==
TSDB_FUNC_SAMPLE
)
{
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
type
=
TSDB_DATA_TYPE_BINARY
;
*
bytes
=
(
sizeof
(
SSampleFuncInfo
)
+
dataBytes
*
param
+
sizeof
(
int64_t
)
*
param
+
extLength
*
param
);
*
bytes
=
(
sizeof
(
SSampleFuncInfo
)
+
dataBytes
*
param
+
sizeof
(
int64_t
)
*
param
+
extLength
*
param
);
...
@@ -521,7 +538,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
...
@@ -521,7 +538,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*
type
=
(
int16_t
)
dataType
;
*
type
=
(
int16_t
)
dataType
;
*
bytes
=
dataBytes
;
*
bytes
=
dataBytes
;
size_t
size
=
sizeof
(
STopBotInfo
)
+
(
sizeof
(
tValuePair
)
+
extLength
)
*
param
;
size_t
size
=
sizeof
(
STopBotInfo
)
+
(
sizeof
(
tValuePair
)
+
POINTER_BYTES
+
extLength
)
*
param
;
// the output column may be larger than sizeof(STopBotInfo)
// the output column may be larger than sizeof(STopBotInfo)
*
interBytes
=
(
int32_t
)
size
;
*
interBytes
=
(
int32_t
)
size
;
...
@@ -545,8 +562,15 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
...
@@ -545,8 +562,15 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
size
=
MAX_MODE_INNER_RESULT_SIZE
;
size
=
MAX_MODE_INNER_RESULT_SIZE
;
}
}
*
interBytes
=
(
int32_t
)
size
;
*
interBytes
=
(
int32_t
)
size
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
functionId
==
TSDB_FUNC_TAIL
)
{
}
else
if
(
functionId
==
TSDB_FUNC_SAMPLE
)
{
*
type
=
(
int16_t
)
dataType
;
*
bytes
=
dataBytes
;
size_t
size
=
(
sizeof
(
STailInfo
)
+
(
sizeof
(
TailUnit
)
+
dataBytes
+
POINTER_BYTES
+
extLength
)
*
param
);
// the output column may be larger than sizeof(STopBotInfo)
*
interBytes
=
(
int32_t
)
size
;
}
else
if
(
functionId
==
TSDB_FUNC_SAMPLE
)
{
*
type
=
(
int16_t
)
dataType
;
*
type
=
(
int16_t
)
dataType
;
*
bytes
=
dataBytes
;
*
bytes
=
dataBytes
;
size_t
size
=
sizeof
(
SSampleFuncInfo
)
+
dataBytes
*
param
+
sizeof
(
int64_t
)
*
param
+
extLength
*
param
;
size_t
size
=
sizeof
(
SSampleFuncInfo
)
+
dataBytes
*
param
+
sizeof
(
int64_t
)
*
param
+
extLength
*
param
;
...
@@ -2506,11 +2530,11 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -2506,11 +2530,11 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
tValuePair
**
tvp
=
pRes
->
res
;
tValuePair
**
tvp
=
pRes
->
res
;
// user specify the order of output by sort the result according to timestamp
// user specify the order of output by sort the result according to timestamp
if
(
pCtx
->
param
[
1
].
i64
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
if
(
pCtx
->
param
[
2
].
i64
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
__compar_fn_t
comparator
=
(
pCtx
->
param
[
2
].
i64
==
TSDB_ORDER_ASC
)
?
resAscComparFn
:
resDescComparFn
;
__compar_fn_t
comparator
=
(
pCtx
->
param
[
3
].
i64
==
TSDB_ORDER_ASC
)
?
resAscComparFn
:
resDescComparFn
;
qsort
(
tvp
,
(
size_t
)
pResInfo
->
numOfRes
,
POINTER_BYTES
,
comparator
);
qsort
(
tvp
,
(
size_t
)
pResInfo
->
numOfRes
,
POINTER_BYTES
,
comparator
);
}
else
/*if (pCtx->param[
1
].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX)*/
{
}
else
/*if (pCtx->param[
2
].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX)*/
{
__compar_fn_t
comparator
=
(
pCtx
->
param
[
2
].
i64
==
TSDB_ORDER_ASC
)
?
resDataAscComparFn
:
resDataDescComparFn
;
__compar_fn_t
comparator
=
(
pCtx
->
param
[
3
].
i64
==
TSDB_ORDER_ASC
)
?
resDataAscComparFn
:
resDataDescComparFn
;
qsort
(
tvp
,
(
size_t
)
pResInfo
->
numOfRes
,
POINTER_BYTES
,
comparator
);
qsort
(
tvp
,
(
size_t
)
pResInfo
->
numOfRes
,
POINTER_BYTES
,
comparator
);
}
}
...
@@ -5110,21 +5134,18 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -5110,21 +5134,18 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer
(
pCtx
);
doFinalizer
(
pCtx
);
}
}
// unique
// unique&tail copy
static
void
copyUniqueRes
(
SQLFunctionCtx
*
pCtx
,
int32_t
bytes
)
{
static
void
copyRes
(
SQLFunctionCtx
*
pCtx
,
void
*
data
,
int32_t
bytes
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
size_t
size
=
sizeof
(
int64_t
)
+
bytes
+
pCtx
->
tagInfo
.
tagsLen
;
SUniqueFuncInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
size_t
size
=
sizeof
(
UniqueUnit
)
+
bytes
+
pCtx
->
tagInfo
.
tagsLen
;
int32_t
len
=
(
int32_t
)(
GET_RES_INFO
(
pCtx
)
->
numOfRes
);
int32_t
len
=
(
int32_t
)(
GET_RES_INFO
(
pCtx
)
->
numOfRes
);
char
*
tsOutput
=
pCtx
->
ptsOutputBuf
;
char
*
tsOutput
=
pCtx
->
ptsOutputBuf
;
char
*
output
=
pCtx
->
pOutput
;
char
*
output
=
pCtx
->
pOutput
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pCtx
->
param
[
2
].
i64
);
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pCtx
->
param
[
3
].
i64
);
char
*
tvp
=
pRes
->
res
+
(
size
*
((
pCtx
->
param
[
2
].
i64
==
TSDB_ORDER_ASC
)
?
0
:
len
-
1
));
char
*
tvp
=
data
+
(
size
*
((
pCtx
->
param
[
3
].
i64
==
TSDB_ORDER_ASC
)
?
0
:
len
-
1
));
for
(
int32_t
i
=
0
;
i
<
len
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
len
;
++
i
)
{
memcpy
(
tsOutput
,
tvp
,
sizeof
(
int64_t
));
memcpy
(
tsOutput
,
tvp
,
sizeof
(
int64_t
));
memcpy
(
output
,
tvp
+
sizeof
(
UniqueUni
t
),
bytes
);
memcpy
(
output
,
tvp
+
sizeof
(
int64_
t
),
bytes
);
tvp
+=
(
step
*
size
);
tvp
+=
(
step
*
size
);
tsOutput
+=
sizeof
(
int64_t
);
tsOutput
+=
sizeof
(
int64_t
);
output
+=
bytes
;
output
+=
bytes
;
...
@@ -5141,9 +5162,9 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) {
...
@@ -5141,9 +5162,9 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) {
pData
[
i
]
=
pCtx
->
tagInfo
.
pTagCtxList
[
i
]
->
pOutput
;
pData
[
i
]
=
pCtx
->
tagInfo
.
pTagCtxList
[
i
]
->
pOutput
;
}
}
tvp
=
pRes
->
res
+
(
size
*
((
pCtx
->
param
[
2
].
i64
==
TSDB_ORDER_ASC
)
?
0
:
len
-
1
));
tvp
=
data
+
(
size
*
((
pCtx
->
param
[
3
].
i64
==
TSDB_ORDER_ASC
)
?
0
:
len
-
1
));
for
(
int32_t
i
=
0
;
i
<
len
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
len
;
++
i
)
{
int32_t
offset
=
(
int32_t
)
sizeof
(
UniqueUni
t
)
+
bytes
;
int32_t
offset
=
(
int32_t
)
sizeof
(
int64_
t
)
+
bytes
;
for
(
int32_t
j
=
0
;
j
<
pCtx
->
tagInfo
.
numOfTagCols
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pCtx
->
tagInfo
.
numOfTagCols
;
++
j
)
{
memcpy
(
pData
[
j
],
tvp
+
offset
,
(
size_t
)
pCtx
->
tagInfo
.
pTagCtxList
[
j
]
->
outputBytes
);
memcpy
(
pData
[
j
],
tvp
+
offset
,
(
size_t
)
pCtx
->
tagInfo
.
pTagCtxList
[
j
]
->
outputBytes
);
offset
+=
pCtx
->
tagInfo
.
pTagCtxList
[
j
]
->
outputBytes
;
offset
+=
pCtx
->
tagInfo
.
pTagCtxList
[
j
]
->
outputBytes
;
...
@@ -5251,10 +5272,10 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) {
...
@@ -5251,10 +5272,10 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) {
typedef
struct
{
typedef
struct
{
int32_t
dataOffset
;
int32_t
dataOffset
;
__compar_fn_t
comparFn
;
__compar_fn_t
comparFn
;
}
Uique
Supporter
;
}
Sort
Supporter
;
static
int32_t
unique
CompareFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
static
int32_t
sort
CompareFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
UiqueSupporter
*
support
=
(
Uique
Supporter
*
)
param
;
SortSupporter
*
support
=
(
Sort
Supporter
*
)
param
;
return
support
->
comparFn
((
const
char
*
)
p1
+
support
->
dataOffset
,
(
const
char
*
)
p2
+
support
->
dataOffset
);
return
support
->
comparFn
((
const
char
*
)
p1
+
support
->
dataOffset
,
(
const
char
*
)
p2
+
support
->
dataOffset
);
}
}
...
@@ -5272,19 +5293,19 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -5272,19 +5293,19 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
bytes
=
pCtx
->
inputBytes
;
bytes
=
pCtx
->
inputBytes
;
type
=
pCtx
->
inputType
;
type
=
pCtx
->
inputType
;
}
}
Uique
Supporter
support
=
{
0
};
Sort
Supporter
support
=
{
0
};
// user specify the order of output by sort the result according to timestamp
// user specify the order of output by sort the result according to timestamp
if
(
pCtx
->
param
[
1
].
i64
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
if
(
pCtx
->
param
[
2
].
i64
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
support
.
dataOffset
=
0
;
support
.
dataOffset
=
0
;
support
.
comparFn
=
compareInt64Val
;
support
.
comparFn
=
compareInt64Val
;
}
else
{
}
else
{
support
.
dataOffset
=
sizeof
(
UniqueUni
t
);
support
.
dataOffset
=
sizeof
(
int64_
t
);
support
.
comparFn
=
getComparFunc
(
type
,
0
);
support
.
comparFn
=
getComparFunc
(
type
,
0
);
}
}
size_t
size
=
sizeof
(
UniqueUni
t
)
+
bytes
+
pCtx
->
tagInfo
.
tagsLen
;
size_t
size
=
sizeof
(
int64_
t
)
+
bytes
+
pCtx
->
tagInfo
.
tagsLen
;
taosqsort
(
pInfo
->
res
,
(
size_t
)
GET_RES_INFO
(
pCtx
)
->
numOfRes
,
size
,
&
support
,
unique
CompareFn
);
taosqsort
(
pInfo
->
res
,
(
size_t
)
GET_RES_INFO
(
pCtx
)
->
numOfRes
,
size
,
&
support
,
sort
CompareFn
);
copy
UniqueRes
(
pCtx
,
bytes
);
copy
Res
(
pCtx
,
pInfo
->
res
,
bytes
);
doFinalizer
(
pCtx
);
doFinalizer
(
pCtx
);
}
}
...
@@ -5396,6 +5417,178 @@ static void mode_func_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -5396,6 +5417,178 @@ static void mode_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer
(
pCtx
);
doFinalizer
(
pCtx
);
}
}
static
void
buildTailStruct
(
STailInfo
*
pTailInfo
,
SQLFunctionCtx
*
pCtx
)
{
char
*
tmp
=
(
char
*
)
pTailInfo
+
sizeof
(
STailInfo
);
pTailInfo
->
res
=
(
TailUnit
**
)
tmp
;
tmp
+=
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64
;
int32_t
bytes
=
0
;
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
bytes
=
pCtx
->
outputBytes
;
}
else
{
bytes
=
pCtx
->
inputBytes
;
}
size_t
size
=
sizeof
(
TailUnit
)
+
bytes
+
pCtx
->
tagInfo
.
tagsLen
;
for
(
int32_t
i
=
0
;
i
<
pCtx
->
param
[
0
].
i64
;
++
i
)
{
pTailInfo
->
res
[
i
]
=
(
TailUnit
*
)
tmp
;
tmp
+=
size
;
}
}
static
void
valueTailAssign
(
TailUnit
*
dst
,
int32_t
bytes
,
const
char
*
val
,
int64_t
tsKey
,
SExtTagsInfo
*
pTagInfo
,
char
*
pTags
,
int16_t
stage
)
{
dst
->
timestamp
=
tsKey
;
memcpy
(
dst
->
data
,
val
,
bytes
);
if
(
stage
==
MERGE_STAGE
)
{
memcpy
(
dst
->
data
+
bytes
,
pTags
,
(
size_t
)
pTagInfo
->
tagsLen
);
}
else
{
// the tags are dumped from the ctx tag fields
int32_t
size
=
0
;
for
(
int32_t
i
=
0
;
i
<
pTagInfo
->
numOfTagCols
;
++
i
)
{
SQLFunctionCtx
*
ctx
=
pTagInfo
->
pTagCtxList
[
i
];
if
(
ctx
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
ctx
->
tag
.
nType
=
TSDB_DATA_TYPE_BIGINT
;
ctx
->
tag
.
i64
=
tsKey
;
}
tVariantDump
(
&
ctx
->
tag
,
ctx
->
pOutput
,
ctx
->
tag
.
nType
,
true
);
memcpy
(
dst
->
data
+
bytes
+
size
,
ctx
->
pOutput
,
ctx
->
outputBytes
);
size
+=
ctx
->
outputBytes
;
}
}
}
static
int32_t
tailComparFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
TailUnit
*
d1
=
*
(
TailUnit
**
)
p1
;
TailUnit
*
d2
=
*
(
TailUnit
**
)
p2
;
return
compareInt64Val
(
d1
,
d2
);
}
static
void
tailSwapFn
(
void
*
dst
,
void
*
src
,
const
void
*
param
)
{
tValuePair
**
vdst
=
(
tValuePair
**
)
dst
;
tValuePair
**
vsrc
=
(
tValuePair
**
)
src
;
tValuePair
*
tmp
=
*
vdst
;
*
vdst
=
*
vsrc
;
*
vsrc
=
tmp
;
}
static
void
do_tail_function_add
(
STailInfo
*
pInfo
,
int32_t
maxLen
,
void
*
pData
,
int64_t
ts
,
int32_t
bytes
,
SExtTagsInfo
*
pTagInfo
,
char
*
pTags
,
int16_t
stage
)
{
TailUnit
**
pList
=
pInfo
->
res
;
if
(
pInfo
->
num
<
maxLen
)
{
valueTailAssign
(
pList
[
pInfo
->
num
],
bytes
,
pData
,
ts
,
pTagInfo
,
pTags
,
stage
);
taosheapsort
((
void
*
)
pList
,
sizeof
(
TailUnit
**
),
pInfo
->
num
+
1
,
NULL
,
tailComparFn
,
NULL
,
tailSwapFn
,
0
);
pInfo
->
num
++
;
}
else
{
valueTailAssign
(
pList
[
0
],
bytes
,
pData
,
ts
,
pTagInfo
,
pTags
,
stage
);
taosheapadjust
((
void
*
)
pList
,
sizeof
(
TailUnit
**
),
0
,
maxLen
-
1
,
NULL
,
tailComparFn
,
NULL
,
tailSwapFn
,
0
);
}
}
static
bool
tail_function_setup
(
SQLFunctionCtx
*
pCtx
,
SResultRowCellInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
return
false
;
}
STailInfo
*
pInfo
=
getOutputInfo
(
pCtx
);
buildTailStruct
(
pInfo
,
pCtx
);
return
true
;
}
static
void
tail_function
(
SQLFunctionCtx
*
pCtx
)
{
STailInfo
*
pRes
=
getOutputInfo
(
pCtx
);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
if
(
pRes
->
offset
++
<
(
int32_t
)
pCtx
->
param
[
1
].
i64
){
continue
;
}
if
(
pRes
->
num
>=
(
int32_t
)
pCtx
->
param
[
0
].
i64
){
break
;
}
char
*
data
=
GET_INPUT_DATA
(
pCtx
,
i
);
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
valueTailAssign
(
pRes
->
res
[
pRes
->
num
],
pCtx
->
inputBytes
,
data
,
ts
,
&
pCtx
->
tagInfo
,
NULL
,
pCtx
->
currentStage
);
pRes
->
num
++
;
}
// treat the result as only one result
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
}
static
void
tail_func_merge
(
SQLFunctionCtx
*
pCtx
)
{
STailInfo
*
pInput
=
(
STailInfo
*
)
GET_INPUT_DATA_LIST
(
pCtx
);
// construct the input data struct from binary data
buildTailStruct
(
pInput
,
pCtx
);
STailInfo
*
pOutput
=
getOutputInfo
(
pCtx
);
// the intermediate result is binary, we only use the output data type
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
do_tail_function_add
(
pOutput
,
(
int32_t
)
pCtx
->
param
[
0
].
i64
,
pInput
->
res
[
i
]
->
data
,
pInput
->
res
[
i
]
->
timestamp
,
pCtx
->
outputBytes
,
&
pCtx
->
tagInfo
,
pInput
->
res
[
i
]
->
data
+
pCtx
->
outputBytes
,
pCtx
->
currentStage
);
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
pOutput
->
num
;
}
static
void
tail_func_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
// data in temporary list is less than the required number of results, not enough qualified number of results
STailInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pCtx
->
stableQuery
){
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
pRes
->
num
-
pCtx
->
param
[
1
].
i64
;
}
else
{
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
pRes
->
num
;
}
int32_t
bytes
=
0
;
int32_t
type
=
0
;
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
bytes
=
pCtx
->
outputBytes
;
type
=
pCtx
->
outputType
;
assert
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_BINARY
);
}
else
{
bytes
=
pCtx
->
inputBytes
;
type
=
pCtx
->
inputType
;
}
SortSupporter
support
=
{
0
};
// user specify the order of output by sort the result according to timestamp
if
(
pCtx
->
param
[
2
].
i64
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
support
.
dataOffset
=
0
;
support
.
comparFn
=
compareInt64Val
;
}
else
{
support
.
dataOffset
=
sizeof
(
int64_t
);
support
.
comparFn
=
getComparFunc
(
type
,
0
);
}
size_t
size
=
sizeof
(
int64_t
)
+
bytes
+
pCtx
->
tagInfo
.
tagsLen
;
void
*
data
=
calloc
(
size
,
GET_RES_INFO
(
pCtx
)
->
numOfRes
);
if
(
!
data
){
qError
(
"calloc error in tail_func_finalizer: size:%d, num:%d"
,
(
int32_t
)
size
,
GET_RES_INFO
(
pCtx
)
->
numOfRes
);
return
;
}
for
(
int32_t
start
=
pCtx
->
param
[
1
].
i64
,
i
=
0
;
start
<
pRes
->
num
;
start
++
,
i
++
){
memcpy
(
data
+
i
*
size
,
pRes
->
res
[
start
],
size
);
}
taosqsort
(
data
,
(
size_t
)
GET_RES_INFO
(
pCtx
)
->
numOfRes
,
size
,
&
support
,
sortCompareFn
);
copyRes
(
pCtx
,
data
,
bytes
);
free
(
data
);
doFinalizer
(
pCtx
);
}
/////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////
/*
/*
* function compatible list.
* function compatible list.
...
@@ -5416,8 +5609,8 @@ int32_t functionCompatList[] = {
...
@@ -5416,8 +5609,8 @@ int32_t functionCompatList[] = {
1
,
1
,
1
,
1
,
-
1
,
1
,
1
,
1
,
5
,
1
,
1
,
1
,
1
,
1
,
1
,
-
1
,
1
,
1
,
1
,
5
,
1
,
1
,
// tid_tag, deriv, csum, mavg, sample,
// tid_tag, deriv, csum, mavg, sample,
6
,
8
,
-
1
,
-
1
,
-
1
,
6
,
8
,
-
1
,
-
1
,
-
1
,
// block_info,elapsed,histogram,unique,mode
// block_info,elapsed,histogram,unique,mode
,tail
7
,
1
,
-
1
,
-
1
,
1
7
,
1
,
-
1
,
-
1
,
1
,
-
1
};
};
SAggFunctionInfo
aAggs
[
TSDB_FUNC_MAX_NUM
]
=
{{
SAggFunctionInfo
aAggs
[
TSDB_FUNC_MAX_NUM
]
=
{{
...
@@ -5914,5 +6107,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
...
@@ -5914,5 +6107,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
mode_func_finalizer
,
mode_func_finalizer
,
mode_function_merge
,
mode_function_merge
,
dataBlockRequired
,
dataBlockRequired
,
}
},
{
// 41
"tail"
,
TSDB_FUNC_TAIL
,
TSDB_FUNC_TAIL
,
TSDB_BASE_FUNC_MO
|
TSDB_FUNCSTATE_SELECTIVITY
,
tail_function_setup
,
tail_function
,
tail_func_finalizer
,
tail_func_merge
,
dataBlockRequired
,
}
};
};
src/query/src/qExecutor.c
浏览文件 @
3d0838d1
...
@@ -2001,16 +2001,15 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
...
@@ -2001,16 +2001,15 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
int32_t
functionId
=
pCtx
->
functionId
;
int32_t
functionId
=
pCtx
->
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
||
functionId
==
TSDB_FUNC_UNIQUE
)
{
||
functionId
==
TSDB_FUNC_DIFF
||
functionId
==
TSDB_FUNC_UNIQUE
||
functionId
==
TSDB_FUNC_TAIL
)
{
int32_t
f
=
pExpr
[
i
-
1
].
base
.
functionId
;
int32_t
f
=
pExpr
[
i
-
1
].
base
.
functionId
;
assert
(
f
==
TSDB_FUNC_TS
||
f
==
TSDB_FUNC_TS_DUMMY
);
assert
(
f
==
TSDB_FUNC_TS
||
f
==
TSDB_FUNC_TS_DUMMY
);
pCtx
->
param
[
2
].
i64
=
pQueryAttr
->
order
.
order
;
pCtx
->
param
[
3
].
i64
=
pQueryAttr
->
order
.
order
;
pCtx
->
param
[
2
].
nType
=
TSDB_DATA_TYPE_BIGINT
;
pCtx
->
param
[
3
].
i64
=
functionId
;
pCtx
->
param
[
3
].
nType
=
TSDB_DATA_TYPE_BIGINT
;
pCtx
->
param
[
3
].
nType
=
TSDB_DATA_TYPE_BIGINT
;
pCtx
->
param
[
1
].
i64
=
pQueryAttr
->
order
.
orderColId
;
pCtx
->
param
[
2
].
i64
=
pQueryAttr
->
order
.
orderColId
;
}
else
if
(
functionId
==
TSDB_FUNC_INTERP
)
{
}
else
if
(
functionId
==
TSDB_FUNC_INTERP
)
{
pCtx
->
param
[
2
].
i64
=
(
int8_t
)
pQueryAttr
->
fillType
;
pCtx
->
param
[
2
].
i64
=
(
int8_t
)
pQueryAttr
->
fillType
;
if
(
pQueryAttr
->
fillVal
!=
NULL
)
{
if
(
pQueryAttr
->
fillVal
!=
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录