Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9a038322
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9a038322
编写于
4月 21, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(query): do some internal refactor.
上级
a6e8ad6e
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
205 addition
and
318 deletion
+205
-318
include/libs/function/function.h
include/libs/function/function.h
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+48
-54
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+3
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+5
-4
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+28
-17
source/libs/function/src/taggfunction.c
source/libs/function/src/taggfunction.c
+120
-242
未找到文件。
include/libs/function/function.h
浏览文件 @
9a038322
...
...
@@ -192,7 +192,7 @@ typedef struct SqlFunctionCtx {
int16_t
functionId
;
// function id
char
*
pOutput
;
// final result output buffer, point to sdata->data
int32_t
numOfParams
;
S
Variant
param
[
4
];
// input parameter, e.g., top(k, 20), the number of results for top query is kept in param
S
FunctParam
*
param
;
// input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t
*
ptsList
;
// corresponding timestamp array list
SColumnInfoData
*
pTsOutput
;
// corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t
offset
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
9a038322
...
...
@@ -1087,6 +1087,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pCtx
[
i
].
currentStage
=
MAIN_SCAN
;
SInputColumnInfoData
*
pInput
=
&
pCtx
[
i
].
input
;
pInput
->
uid
=
pBlock
->
info
.
uid
;
SExprInfo
*
pOneExpr
=
&
pOperator
->
pExpr
[
i
];
for
(
int32_t
j
=
0
;
j
<
pOneExpr
->
base
.
numOfParams
;
++
j
)
{
...
...
@@ -1101,7 +1102,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput
->
pPTS
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
// todo set the correct timestamp column
ASSERT
(
pInput
->
pData
[
j
]
!=
NULL
);
}
else
if
(
pFuncParam
->
type
==
FUNC_PARAM_TYPE_VALUE
)
{
if
(
createDummyCol
)
{
// todo avoid case: top(k, 12), 12 is the value parameter.
// sum(11), 11 is also the value parameter.
if
(
createDummyCol
&&
pOneExpr
->
base
.
numOfParams
==
1
)
{
code
=
doCreateConstantValColumnInfo
(
pInput
,
pFuncParam
,
pFuncParam
->
param
.
nType
,
j
,
pBlock
->
info
.
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -1876,67 +1879,58 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
}
}
pCtx
->
resDataInfo
.
interBufSize
=
env
.
calcMemSize
;
}
else
if
(
pExpr
->
pExpr
->
nodeType
==
QUERY_NODE_COLUMN
||
pExpr
->
pExpr
->
nodeType
==
QUERY_NODE_OPERATOR
||
pExpr
->
pExpr
->
nodeType
==
QUERY_NODE_VALUE
)
{
pCtx
->
resDataInfo
.
interBufSize
=
pFunct
->
resSchema
.
bytes
;
// for simple column, the intermediate buffer needs to hold one element.
}
else
if
(
pExpr
->
pExpr
->
nodeType
==
QUERY_NODE_COLUMN
||
pExpr
->
pExpr
->
nodeType
==
QUERY_NODE_OPERATOR
||
pExpr
->
pExpr
->
nodeType
==
QUERY_NODE_VALUE
)
{
// for simple column, the intermediate buffer needs to hold one element.
pCtx
->
resDataInfo
.
interBufSize
=
pFunct
->
resSchema
.
bytes
;
}
pCtx
->
input
.
numOfInputCols
=
pFunct
->
numOfParams
;
pCtx
->
input
.
pData
=
taosMemoryCalloc
(
pFunct
->
numOfParams
,
POINTER_BYTES
);
pCtx
->
input
.
pColumnDataAgg
=
taosMemoryCalloc
(
pFunct
->
numOfParams
,
POINTER_BYTES
);
pCtx
->
pTsOutput
=
NULL
;
pCtx
->
pTsOutput
=
NULL
;
pCtx
->
resDataInfo
.
bytes
=
pFunct
->
resSchema
.
bytes
;
pCtx
->
resDataInfo
.
type
=
pFunct
->
resSchema
.
type
;
pCtx
->
order
=
TSDB_ORDER_ASC
;
pCtx
->
start
.
key
=
INT64_MIN
;
pCtx
->
end
.
key
=
INT64_MIN
;
#if 0
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// int16_t type = pFunct->param[j].nType;
// int16_t bytes = pFunct->param[j].nLen;
// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type);
// } else {
// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type);
// }
}
pCtx
->
resDataInfo
.
type
=
pFunct
->
resSchema
.
type
;
pCtx
->
order
=
TSDB_ORDER_ASC
;
pCtx
->
start
.
key
=
INT64_MIN
;
pCtx
->
end
.
key
=
INT64_MIN
;
pCtx
->
numOfParams
=
pExpr
->
base
.
numOfParams
;
// set the order information for top/bottom query
int32_t functionId = pCtx->functionId;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
int32_t f
= getExprFunctionId(&pExpr[0])
;
assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY);
// pCtx->param[2].i = pQueryAttr->order.order
;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[3].i = functionId
;
pCtx->param[3
].nType = TSDB_DATA_TYPE_BIGINT;
// pCtx->param[1].i = pQueryAttr->order.col.info.colId
;
} else if (functionId == FUNCTION_INTERP) {
// pCtx->param[2].i = (int8_t)pQueryAttr->fillType
;
// if (pQueryAttr->fillVal != NULL
) {
// if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) {
// pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
// } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value
// if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
// taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType);
// }
// }
//
}
} else if (functionId == FUNCTION_TS_COMP) {
// pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client
pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_TWA) {
// pCtx->param[1].i = pQueryAttr->window.skey
;
pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT
;
// pCtx->param[2].i = pQueryAttr->window.ekey
;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_ARITHM) {
// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
pCtx
->
param
=
pFunct
->
pParam
;
for
(
int32_t
j
=
0
;
j
<
pCtx
->
numOfParams
;
++
j
)
{
// set the order information for top/bottom query
int32_t
f
unctionId
=
pCtx
->
functionId
;
if
(
functionId
==
FUNCTION_TOP
||
functionId
==
FUNCTION_BOTTOM
||
functionId
==
FUNCTION_DIFF
)
{
int32_t
f
=
getExprFunctionId
(
&
pExpr
[
0
]);
assert
(
f
==
FUNCTION_TS
||
f
==
FUNCTION_TS_DUMMY
)
;
// pCtx->param[2].i = pQueryAttr->order.order
;
// pCtx->param[2
].nType = TSDB_DATA_TYPE_BIGINT;
// pCtx->param[3].i = functionId;
// pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT
;
// pCtx->param[1].i = pQueryAttr->order.col.info.colId
;
}
else
if
(
functionId
==
FUNCTION_INTERP
)
{
// pCtx->param[2].i = (int8_t)pQueryAttr->fillType;
// if (pQueryAttr->fillVal != NULL) {
// if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) {
// pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
// } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value
// if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
// taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType);
//
}
// }
// }
}
else
if
(
functionId
==
FUNCTION_TWA
)
{
// pCtx->param[1].i = pQueryAttr->window.skey;
// pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT
;
// pCtx->param[2].i = pQueryAttr->window.ekey
;
// pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT
;
}
else
if
(
functionId
==
FUNCTION_ARITHM
)
{
// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
}
}
#endif
}
for
(
int32_t
i
=
1
;
i
<
numOfOutput
;
++
i
)
{
...
...
@@ -1955,7 +1949,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
for
(
int32_t
j
=
0
;
j
<
pCtx
[
i
].
numOfParams
;
++
j
)
{
taosVariantDestroy
(
&
pCtx
[
i
].
param
[
j
]);
taosVariantDestroy
(
&
pCtx
[
i
].
param
[
j
]
.
param
);
}
taosVariantDestroy
(
&
pCtx
[
i
].
tag
);
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
9a038322
...
...
@@ -58,6 +58,9 @@ bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t
firstFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
lastFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getTopBotFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
);
int32_t
topFunction
(
SqlFunctionCtx
*
pCtx
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/function/src/builtins.c
浏览文件 @
9a038322
...
...
@@ -200,7 +200,8 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_
}
static
int32_t
translateTop
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// todo
SDataType
*
pType
=
&
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
;
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
pType
->
bytes
,
.
type
=
pType
->
type
};
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -499,9 +500,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
type
=
FUNCTION_TYPE_TOP
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
translateFunc
=
translateTop
,
.
getEnvFunc
=
get
Minmax
FuncEnv
,
.
initFunc
=
maxF
unctionSetup
,
.
processFunc
=
max
Function
,
.
getEnvFunc
=
get
TopBot
FuncEnv
,
.
initFunc
=
f
unctionSetup
,
.
processFunc
=
top
Function
,
.
finalizeFunc
=
functionFinalize
},
{
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
9a038322
...
...
@@ -14,10 +14,11 @@
*/
#include "builtinsimpl.h"
#include
"tpercentile.h"
#include
<libs/nodes/querynodes.h>
#include "querynodes.h"
#include "taggfunction.h"
#include "tdatablock.h"
#include "tpercentile.h"
#define SET_VAL(_info, numOfElem, res) \
do { \
...
...
@@ -738,9 +739,9 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
return
TSDB_CODE_SUCCESS
;
}
// TODO set the correct parameter.
void
percentileFinalize
(
SqlFunctionCtx
*
pCtx
)
{
double
v
=
50
;
//pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SVariant
*
pVal
=
&
pCtx
->
param
[
1
].
param
;
double
v
=
pVal
->
nType
==
TSDB_DATA_TYPE_INT
?
pVal
->
i
:
pVal
->
d
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
ppInfo
=
(
SPercentileInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
...
...
@@ -1180,17 +1181,22 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
int32_t
bytes
=
pColNode
->
node
.
resType
.
bytes
;
SValueNode
*
pkNode
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
pEnv
->
calcMemSize
=
sizeof
(
STopBotRes
)
+
pkNode
->
datum
.
i
*
bytes
;
return
true
;
}
static
STopBotRes
*
getTopBotOutputInfo
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
return
GET_ROWCELL_INTERBUF
(
pResInfo
);
STopBotRes
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pRes
->
pItems
=
(
STopBotResItem
*
)((
char
*
)
pRes
+
sizeof
(
STopBotRes
));
return
pRes
;
}
static
void
doAddIntoResult
(
STopBotRes
*
pRes
,
int32_t
maxSize
,
void
*
pData
,
uint16_t
type
,
uint64_t
uid
);
static
void
topFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
topFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElems
=
0
;
STopBotRes
*
pRes
=
getTopBotOutputInfo
(
pCtx
);
...
...
@@ -1215,11 +1221,12 @@ static void topFunction(SqlFunctionCtx *pCtx) {
numOfElems
++
;
char
*
data
=
colDataGetData
(
pCol
,
i
);
doAddIntoResult
(
pRes
,
pCtx
->
param
[
0
]
.
i
,
data
,
type
,
pInput
->
uid
);
doAddIntoResult
(
pRes
,
pCtx
->
param
[
1
].
param
.
i
,
data
,
type
,
pInput
->
uid
);
}
// treat the result as only one result
SET_VAL
(
GET_RES_INFO
(
pCtx
),
numOfElems
,
1
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
topBotResComparFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
...
...
@@ -1266,16 +1273,20 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t ty
pRes
->
num
++
;
taosheapsort
((
void
*
)
pItem
,
sizeof
(
STopBotResItem
),
pRes
->
num
,
(
const
void
*
)
&
type
,
topBotResComparFn
,
false
);
}
else
{
// replace the minimum value in the result
// if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pList[0]->v.i) ||
// (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pList[0]->v.u) ||
// (IS_FLOAT_TYPE(type) && val.d > pList[0]->v.d)) {
//
// STopBotResItem* pItem = &pItems[0];
// pItem->v = val;
// pItem->uid = uid;
// pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
//
// taosheapadjust((void *) pItem, sizeof(STopBotResItem), 0, pRes->num - 1, (const void *) &type, topBotResComparFn, NULL, false);
// }
if
((
IS_SIGNED_NUMERIC_TYPE
(
type
)
&&
val
.
i
>
pItems
[
0
].
v
.
i
)
||
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
)
&&
val
.
u
>
pItems
[
0
].
v
.
u
)
||
(
IS_FLOAT_TYPE
(
type
)
&&
val
.
d
>
pItems
[
0
].
v
.
d
))
{
STopBotResItem
*
pItem
=
&
pItems
[
pRes
->
num
];
pItem
->
v
=
val
;
pItem
->
uid
=
uid
;
pItem
->
tuplePos
.
pageId
=
-
1
;
// todo set the corresponding tuple data in the disk-based buffer
taosheapadjust
((
void
*
)
pItem
,
sizeof
(
STopBotResItem
),
0
,
pRes
->
num
-
1
,
(
const
void
*
)
&
type
,
topBotResComparFn
,
NULL
,
false
);
}
}
}
void
topBotFinalize
(
SqlFunctionCtx
*
pCtx
)
{
functionFinalize
(
pCtx
);
}
\ No newline at end of file
source/libs/function/src/taggfunction.c
浏览文件 @
9a038322
...
...
@@ -765,9 +765,9 @@ static int32_t firstFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t c
}
static
int32_t
lastFuncRequired
(
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
]
.
i
)
{
return
BLK_DATA_NOT_LOAD
;
}
// if (pCtx->order != pCtx->param[0].param
.i) {
//
return BLK_DATA_NOT_LOAD;
//
}
if
(
GET_RES_INFO
(
pCtx
)
==
NULL
||
GET_RES_INFO
(
pCtx
)
->
numOfRes
<=
0
)
{
return
BLK_DATA_DATA_LOAD
;
...
...
@@ -797,9 +797,9 @@ static int32_t firstDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32
}
static
int32_t
lastDistFuncRequired
(
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
w
,
int32_t
colId
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
]
.
i
)
{
return
BLK_DATA_NOT_LOAD
;
}
// if (pCtx->order != pCtx->param[0].param
.i) {
//
return BLK_DATA_NOT_LOAD;
//
}
// not initialized yet, it is the first block, load it.
if
(
pCtx
->
pOutput
==
NULL
)
{
...
...
@@ -1261,128 +1261,6 @@ int32_t tsCompare(const void* p1, const void* p2) {
}
}
static
void
stddev_dst_function
(
SqlFunctionCtx
*
pCtx
)
{
SStddevdstInfo
*
pStd
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
// the second stage to calculate standard deviation
double
*
retVal
=
&
pStd
->
res
;
// all data are null, no need to proceed
SArray
*
resList
=
(
SArray
*
)
pCtx
->
param
[
0
].
pz
;
if
(
resList
==
NULL
)
{
return
;
}
// find the correct group average results according to the tag value
int32_t
len
=
(
int32_t
)
taosArrayGetSize
(
resList
);
assert
(
len
>
0
);
double
avg
=
0
;
if
(
len
==
1
)
{
SResPair
*
p
=
taosArrayGet
(
resList
,
0
);
avg
=
p
->
avg
;
}
else
{
// todo opt performance by using iterator since the timestamp lsit is matched with the output result
SResPair
*
p
=
bsearch
(
&
pCtx
->
startTs
,
resList
->
pData
,
len
,
sizeof
(
SResPair
),
tsCompare
);
if
(
p
==
NULL
)
{
return
;
}
avg
=
p
->
avg
;
}
void
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
int32_t
num
=
0
;
switch
(
pCtx
->
inputType
)
{
case
TSDB_DATA_TYPE_INT
:
{
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
if
(
pCtx
->
hasNull
&&
isNull
((
const
char
*
)
(
&
((
int32_t
*
)
pData
)[
i
]),
pCtx
->
inputType
))
{
continue
;
}
num
+=
1
;
*
retVal
+=
TPOW2
(((
int32_t
*
)
pData
)[
i
]
-
avg
);
}
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
LOOP_STDDEV_IMPL
(
float
,
*
retVal
,
pData
,
pCtx
,
avg
,
pCtx
->
inputType
,
num
);
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
LOOP_STDDEV_IMPL
(
double
,
*
retVal
,
pData
,
pCtx
,
avg
,
pCtx
->
inputType
,
num
);
break
;
}
case
TSDB_DATA_TYPE_TINYINT
:
{
LOOP_STDDEV_IMPL
(
int8_t
,
*
retVal
,
pData
,
pCtx
,
avg
,
pCtx
->
inputType
,
num
);
break
;
}
case
TSDB_DATA_TYPE_UTINYINT
:
{
LOOP_STDDEV_IMPL
(
int8_t
,
*
retVal
,
pData
,
pCtx
,
avg
,
pCtx
->
inputType
,
num
);
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
LOOP_STDDEV_IMPL
(
int16_t
,
*
retVal
,
pData
,
pCtx
,
avg
,
pCtx
->
inputType
,
num
);
break
;
}
case
TSDB_DATA_TYPE_USMALLINT
:
{
LOOP_STDDEV_IMPL
(
uint16_t
,
*
retVal
,
pData
,
pCtx
,
avg
,
pCtx
->
inputType
,
num
);
break
;
}
case
TSDB_DATA_TYPE_UINT
:
{
LOOP_STDDEV_IMPL
(
uint32_t
,
*
retVal
,
pData
,
pCtx
,
avg
,
pCtx
->
inputType
,
num
);
break
;
}
case
TSDB_DATA_TYPE_BIGINT
:
{
LOOP_STDDEV_IMPL
(
int64_t
,
*
retVal
,
pData
,
pCtx
,
avg
,
pCtx
->
inputType
,
num
);
break
;
}
case
TSDB_DATA_TYPE_UBIGINT
:
{
LOOP_STDDEV_IMPL
(
uint64_t
,
*
retVal
,
pData
,
pCtx
,
avg
,
pCtx
->
inputType
,
num
);
break
;
}
default:
assert
(
0
);
// qError("stddev function not support data type:%d", pCtx->inputType);
}
pStd
->
num
+=
num
;
SET_VAL
(
pCtx
,
num
,
1
);
// copy to the final output buffer for super table
memcpy
(
pCtx
->
pOutput
,
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
)),
sizeof
(
SAvgInfo
));
}
static
void
stddev_dst_merge
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SStddevdstInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
char
*
input
=
GET_INPUT_DATA_LIST
(
pCtx
);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
,
input
+=
pCtx
->
inputBytes
)
{
SStddevdstInfo
*
pInput
=
(
SStddevdstInfo
*
)
input
;
if
(
pInput
->
num
==
0
)
{
// current input is null
continue
;
}
pRes
->
num
+=
pInput
->
num
;
pRes
->
res
+=
pInput
->
res
;
}
}
static
void
stddev_dst_finalizer
(
SqlFunctionCtx
*
pCtx
)
{
SStddevdstInfo
*
pStd
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
if
(
pStd
->
num
<=
0
)
{
setNull
(
pCtx
->
pOutput
,
pCtx
->
resDataInfo
.
type
,
pCtx
->
resDataInfo
.
bytes
);
}
else
{
double
*
retValue
=
(
double
*
)
pCtx
->
pOutput
;
SET_DOUBLE_VAL
(
retValue
,
sqrt
(
pStd
->
res
/
pStd
->
num
));
SET_VAL
(
pCtx
,
1
,
1
);
}
doFinalizer
(
pCtx
);
}
//////////////////////////////////////////////////////////////////////////////////////
static
bool
first_last_function_setup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
)
{
if
(
!
function_setup
(
pCtx
,
pResInfo
))
{
...
...
@@ -1390,8 +1268,8 @@ static bool first_last_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo*
}
// used to keep the timestamp for comparison
pCtx
->
param
[
1
]
.
nType
=
0
;
pCtx
->
param
[
1
]
.
i
=
0
;
// pCtx->param[1].param
.nType = 0;
// pCtx->param[1].param
.i = 0;
return
true
;
}
...
...
@@ -1487,13 +1365,13 @@ static void first_dist_func_merge(SqlFunctionCtx *pCtx) {
}
// The param[1] is used to keep the initial value of max ts value
if
(
pCtx
->
param
[
1
].
nType
!=
pCtx
->
resDataInfo
.
type
||
pCtx
->
param
[
1
]
.
i
>
pInput
->
ts
)
{
memcpy
(
pCtx
->
pOutput
,
pData
,
pCtx
->
resDataInfo
.
bytes
);
pCtx
->
param
[
1
]
.
i
=
pInput
->
ts
;
pCtx
->
param
[
1
]
.
nType
=
pCtx
->
resDataInfo
.
type
;
// DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
// if (pCtx->param[1].param.nType != pCtx->resDataInfo.type || pCtx->param[1].param
.i > pInput->ts) {
//
memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes);
// pCtx->param[1].param
.i = pInput->ts;
// pCtx->param[1].param
.nType = pCtx->resDataInfo.type;
//
//
//
DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
//
}
SET_VAL
(
pCtx
,
1
,
1
);
// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
...
...
@@ -1508,9 +1386,9 @@ static void first_dist_func_merge(SqlFunctionCtx *pCtx) {
* least one data in this block that is not null.(TODO opt for this case)
*/
static
void
last_function
(
SqlFunctionCtx
*
pCtx
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
]
.
i
)
{
return
;
}
// if (pCtx->order != pCtx->param[0].param
.i) {
//
return;
//
}
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
...
...
@@ -1582,9 +1460,9 @@ static void last_dist_function(SqlFunctionCtx *pCtx) {
* 1. for scan data is not the required order
* 2. for data blocks that are not loaded, no need to check data
*/
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
]
.
i
)
{
return
;
}
// if (pCtx->order != pCtx->param[0].param
.i) {
//
return;
//
}
int32_t
notNullElems
=
0
;
for
(
int32_t
i
=
pCtx
->
size
-
1
;
i
>=
0
;
--
i
)
{
...
...
@@ -1624,10 +1502,10 @@ static void last_dist_func_merge(SqlFunctionCtx *pCtx) {
* param[1] used to keep the corresponding timestamp to decide if current result is
* the true last result
*/
if
(
pCtx
->
param
[
1
].
nType
!=
pCtx
->
resDataInfo
.
type
||
pCtx
->
param
[
1
]
.
i
<
pInput
->
ts
)
{
if
(
pCtx
->
param
[
1
].
param
.
nType
!=
pCtx
->
resDataInfo
.
type
||
pCtx
->
param
[
1
].
param
.
i
<
pInput
->
ts
)
{
memcpy
(
pCtx
->
pOutput
,
pData
,
pCtx
->
resDataInfo
.
bytes
);
pCtx
->
param
[
1
].
i
=
pInput
->
ts
;
pCtx
->
param
[
1
].
nType
=
pCtx
->
resDataInfo
.
type
;
pCtx
->
param
[
1
].
param
.
i
=
pInput
->
ts
;
pCtx
->
param
[
1
].
param
.
nType
=
pCtx
->
resDataInfo
.
type
;
// DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
...
...
@@ -1955,11 +1833,11 @@ static STopBotInfo *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
static
void
buildTopBotStruct
(
STopBotInfo
*
pTopBotInfo
,
SqlFunctionCtx
*
pCtx
)
{
char
*
tmp
=
(
char
*
)
pTopBotInfo
+
sizeof
(
STopBotInfo
);
pTopBotInfo
->
res
=
(
tValuePair
**
)
tmp
;
tmp
+=
POINTER_BYTES
*
pCtx
->
param
[
0
]
.
i
;
// tmp += POINTER_BYTES * pCtx->param[0].param
.i;
// size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
// for (int32_t i = 0; i < pCtx->param[0].i; ++i) {
// for (int32_t i = 0; i < pCtx->param[0].
param.
i; ++i) {
// pTopBotInfo->res[i] = (tValuePair*) tmp;
// pTopBotInfo->res[i]->pTags = tmp + sizeof(tValuePair);
// tmp += size;
...
...
@@ -1975,13 +1853,13 @@ bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const cha
STopBotInfo
*
pTopBotInfo
=
getTopBotOutputInfo
(
pCtx
);
// required number of results are not reached, continue load data block
if
(
pTopBotInfo
->
num
<
pCtx
->
param
[
0
]
.
i
)
{
return
true
;
}
// if (pTopBotInfo->num < pCtx->param[0].param
.i) {
//
return true;
//
}
if
((
void
*
)
pTopBotInfo
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pTopBotInfo
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
]
.
i
))
{
buildTopBotStruct
(
pTopBotInfo
,
pCtx
);
}
// if ((void *)pTopBotInfo->res[0] != (void *)((char *)pTopBotInfo + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].param
.i)) {
//
buildTopBotStruct(pTopBotInfo, pCtx);
//
}
tValuePair
**
pRes
=
(
tValuePair
**
)
pTopBotInfo
->
res
;
...
...
@@ -2038,9 +1916,9 @@ static void top_function(SqlFunctionCtx *pCtx) {
STopBotInfo
*
pRes
=
getTopBotOutputInfo
(
pCtx
);
assert
(
pRes
->
num
>=
0
);
if
((
void
*
)
pRes
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pRes
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
]
.
i
))
{
buildTopBotStruct
(
pRes
,
pCtx
);
}
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].param
.i)) {
//
buildTopBotStruct(pRes, pCtx);
//
}
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
char
*
data
=
GET_INPUT_DATA
(
pCtx
,
i
);
...
...
@@ -2052,7 +1930,7 @@ static void top_function(SqlFunctionCtx *pCtx) {
// NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
// do_top_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
// do_top_function_add(pRes, (int32_t)pCtx->param[0].
param.
i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
}
if
(
!
pCtx
->
hasNull
)
{
...
...
@@ -2079,7 +1957,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
int16_t
type
=
(
pCtx
->
resDataInfo
.
type
==
TSDB_DATA_TYPE_FLOAT
)
?
TSDB_DATA_TYPE_DOUBLE
:
pCtx
->
resDataInfo
.
type
;
// do_top_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp,
// do_top_function_add(pOutput, (int32_t)pCtx->param[0].
param.
i, &pInput->res[i]->v.i, pInput->res[i]->timestamp,
// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
}
...
...
@@ -2096,9 +1974,9 @@ static void bottom_function(SqlFunctionCtx *pCtx) {
STopBotInfo
*
pRes
=
getTopBotOutputInfo
(
pCtx
);
if
((
void
*
)
pRes
->
res
[
0
]
!=
(
void
*
)((
char
*
)
pRes
+
sizeof
(
STopBotInfo
)
+
POINTER_BYTES
*
pCtx
->
param
[
0
]
.
i
))
{
buildTopBotStruct
(
pRes
,
pCtx
);
}
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].param
.i)) {
//
buildTopBotStruct(pRes, pCtx);
//
}
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
char
*
data
=
GET_INPUT_DATA
(
pCtx
,
i
);
...
...
@@ -2109,7 +1987,7 @@ static void bottom_function(SqlFunctionCtx *pCtx) {
notNullElems
++
;
// NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY
ts
=
(
pCtx
->
ptsList
!=
NULL
)
?
GET_TS_DATA
(
pCtx
,
i
)
:
0
;
// do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
// do_bottom_function_add(pRes, (int32_t)pCtx->param[0].
param.
i, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
}
if
(
!
pCtx
->
hasNull
)
{
...
...
@@ -2136,7 +2014,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for
(
int32_t
i
=
0
;
i
<
pInput
->
num
;
++
i
)
{
int16_t
type
=
(
pCtx
->
resDataInfo
.
type
==
TSDB_DATA_TYPE_FLOAT
)
?
TSDB_DATA_TYPE_DOUBLE
:
pCtx
->
resDataInfo
.
type
;
// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type,
// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].
param.
i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type,
// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
}
...
...
@@ -2162,11 +2040,11 @@ static void top_bottom_func_finalizer(SqlFunctionCtx *pCtx) {
tValuePair
**
tvp
=
pRes
->
res
;
// user specify the order of output by sort the result according to timestamp
if
(
pCtx
->
param
[
1
].
i
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
__compar_fn_t
comparator
=
(
pCtx
->
param
[
2
].
i
==
TSDB_ORDER_ASC
)
?
resAscComparFn
:
resDescComparFn
;
if
(
pCtx
->
param
[
1
].
param
.
i
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
__compar_fn_t
comparator
=
(
pCtx
->
param
[
2
].
param
.
i
==
TSDB_ORDER_ASC
)
?
resAscComparFn
:
resDescComparFn
;
qsort
(
tvp
,
(
size_t
)
pResInfo
->
numOfRes
,
POINTER_BYTES
,
comparator
);
}
else
/*if (pCtx->param[1].i > PRIMARYKEY_TIMESTAMP_COL_ID)*/
{
__compar_fn_t
comparator
=
(
pCtx
->
param
[
2
].
i
==
TSDB_ORDER_ASC
)
?
resDataAscComparFn
:
resDataDescComparFn
;
}
else
/*if (pCtx->param[1].
param.
i > PRIMARYKEY_TIMESTAMP_COL_ID)*/
{
__compar_fn_t
comparator
=
(
pCtx
->
param
[
2
].
param
.
i
==
TSDB_ORDER_ASC
)
?
resDataAscComparFn
:
resDataDescComparFn
;
qsort
(
tvp
,
(
size_t
)
pResInfo
->
numOfRes
,
POINTER_BYTES
,
comparator
);
}
...
...
@@ -2277,7 +2155,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) {
}
static
void
percentile_finalizer
(
SqlFunctionCtx
*
pCtx
)
{
double
v
=
pCtx
->
param
[
0
].
nType
==
TSDB_DATA_TYPE_INT
?
pCtx
->
param
[
0
].
i
:
pCtx
->
param
[
0
]
.
d
;
// double v = pCtx->param[0].param.nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].param.i : pCtx->param[0].param
.d;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
ppInfo
=
(
SPercentileInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
...
...
@@ -2287,7 +2165,7 @@ static void percentile_finalizer(SqlFunctionCtx *pCtx) {
assert
(
ppInfo
->
numOfElems
==
0
);
setNull
(
pCtx
->
pOutput
,
pCtx
->
resDataInfo
.
type
,
pCtx
->
resDataInfo
.
bytes
);
}
else
{
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
getPercentile
(
pMemBucket
,
v
));
//
SET_DOUBLE_VAL((double *)pCtx->pOutput, getPercentile(pMemBucket, v));
}
tMemBucketDestroy
(
pMemBucket
);
...
...
@@ -2389,7 +2267,7 @@ static void apercentile_func_merge(SqlFunctionCtx *pCtx) {
}
static
void
apercentile_finalizer
(
SqlFunctionCtx
*
pCtx
)
{
double
v
=
(
pCtx
->
param
[
0
].
nType
==
TSDB_DATA_TYPE_INT
)
?
pCtx
->
param
[
0
].
i
:
pCtx
->
param
[
0
]
.
d
;
double
v
=
(
pCtx
->
param
[
0
].
param
.
nType
==
TSDB_DATA_TYPE_INT
)
?
pCtx
->
param
[
0
].
param
.
i
:
pCtx
->
param
[
0
].
param
.
d
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SAPercentileInfo
*
pOutput
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
...
...
@@ -2432,7 +2310,7 @@ static bool leastsquares_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInf
SLeastsquaresInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
// 2*3 matrix
pInfo
->
startVal
=
pCtx
->
param
[
0
]
.
d
;
// pInfo->startVal = pCtx->param[0].param
.d;
return
true
;
}
...
...
@@ -2478,54 +2356,54 @@ static void leastsquares_function(SqlFunctionCtx *pCtx) {
param
[
0
][
2
]
+=
x
*
p
[
i
];
param
[
1
][
2
]
+=
p
[
i
];
x
+=
pCtx
->
param
[
1
].
d
;
x
+=
pCtx
->
param
[
1
].
param
.
d
;
numOfElem
++
;
}
break
;
}
case
TSDB_DATA_TYPE_BIGINT
:
{
int64_t
*
p
=
pData
;
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
d
);
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
param
.
d
);
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
double
*
p
=
pData
;
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
d
);
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
param
.
d
);
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
float
*
p
=
pData
;
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
d
);
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
param
.
d
);
break
;
};
case
TSDB_DATA_TYPE_SMALLINT
:
{
int16_t
*
p
=
pData
;
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
d
);
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
param
.
d
);
break
;
}
case
TSDB_DATA_TYPE_TINYINT
:
{
int8_t
*
p
=
pData
;
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
d
);
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
param
.
d
);
break
;
}
case
TSDB_DATA_TYPE_UTINYINT
:
{
uint8_t
*
p
=
pData
;
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
d
);
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
param
.
d
);
break
;
}
case
TSDB_DATA_TYPE_USMALLINT
:
{
uint16_t
*
p
=
pData
;
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
d
);
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
param
.
d
);
break
;
}
case
TSDB_DATA_TYPE_UINT
:
{
uint32_t
*
p
=
pData
;
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
d
);
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
param
.
d
);
break
;
}
case
TSDB_DATA_TYPE_UBIGINT
:
{
uint64_t
*
p
=
pData
;
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
d
);
LEASTSQR_CAL_LOOP
(
pCtx
,
param
,
x
,
p
,
pCtx
->
inputType
,
numOfElem
,
pCtx
->
param
[
1
].
param
.
d
);
break
;
}
}
...
...
@@ -2584,16 +2462,16 @@ static void col_project_function(SqlFunctionCtx *pCtx) {
}
// only one row is required.
if
(
pCtx
->
param
[
0
]
.
i
==
1
)
{
SET_VAL
(
pCtx
,
pCtx
->
size
,
1
);
}
else
{
INC_INIT_VAL
(
pCtx
,
pCtx
->
size
);
}
// if (pCtx->param[0].param
.i == 1) {
//
SET_VAL(pCtx, pCtx->size, 1);
//
} else {
//
INC_INIT_VAL(pCtx, pCtx->size);
//
}
char
*
pData
=
GET_INPUT_DATA_LIST
(
pCtx
);
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
int32_t
numOfRows
=
(
pCtx
->
param
[
0
]
.
i
==
1
)
?
1
:
pCtx
->
size
;
memcpy
(
pCtx
->
pOutput
,
pData
,
(
size_t
)
numOfRows
*
pCtx
->
inputBytes
);
// int32_t numOfRows = (pCtx->param[0].param
.i == 1)? 1:pCtx->size;
//
memcpy(pCtx->pOutput, pData, (size_t) numOfRows * pCtx->inputBytes);
}
else
{
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
memcpy
(
pCtx
->
pOutput
+
(
pCtx
->
size
-
1
-
i
)
*
pCtx
->
inputBytes
,
pData
+
i
*
pCtx
->
inputBytes
,
...
...
@@ -2658,7 +2536,7 @@ static bool diff_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResI
}
// diff function require the value is set to -1
pCtx
->
param
[
1
].
nType
=
INITIAL_VALUE_NOT_ASSIGNED
;
pCtx
->
param
[
1
].
param
.
nType
=
INITIAL_VALUE_NOT_ASSIGNED
;
return
false
;
}
...
...
@@ -2670,9 +2548,9 @@ static bool deriv_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRes
// diff function require the value is set to -1
SDerivInfo
*
pDerivInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
pDerivInfo
->
ignoreNegative
=
pCtx
->
param
[
1
]
.
i
;
// pDerivInfo->ignoreNegative = pCtx->param[1].param
.i;
pDerivInfo
->
prevTs
=
-
1
;
pDerivInfo
->
tsWindow
=
pCtx
->
param
[
0
]
.
i
;
// pDerivInfo->tsWindow = pCtx->param[0].param
.i;
pDerivInfo
->
valueSet
=
false
;
return
false
;
}
...
...
@@ -2861,12 +2739,12 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
#define DIFF_IMPL(ctx, d, type) \
do { \
if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \
(ctx)->param[1].nType = (ctx)->inputType; \
*(type *)&(ctx)->param[1].i = *(type *)(d); \
if ((ctx)->param[1].
param.
nType == INITIAL_VALUE_NOT_ASSIGNED) { \
(ctx)->param[1].
param.
nType = (ctx)->inputType; \
*(type *)&(ctx)->param[1].
param.
i = *(type *)(d); \
} else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i)); \
*(type *)(&(ctx)->param[1].i) = *(type *)(d); \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].
param.
i)); \
*(type *)(&(ctx)->param[1].
param.
i) = *(type *)(d); \
*(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \
} \
} while (0);
...
...
@@ -2874,7 +2752,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
// TODO difference in date column
static
void
diff_function
(
SqlFunctionCtx
*
pCtx
)
{
void
*
data
=
GET_INPUT_DATA_LIST
(
pCtx
);
bool
isFirstBlock
=
(
pCtx
->
param
[
1
].
nType
==
INITIAL_VALUE_NOT_ASSIGNED
);
bool
isFirstBlock
=
(
pCtx
->
param
[
1
].
param
.
nType
==
INITIAL_VALUE_NOT_ASSIGNED
);
int32_t
notNullElems
=
0
;
...
...
@@ -2894,15 +2772,15 @@ static void diff_function(SqlFunctionCtx *pCtx) {
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
int32_t
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
i
);
// direct previous may be null
if
(
pCtx
->
param
[
1
].
param
.
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
int32_t
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
param
.
i
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
pCtx
->
param
[
1
].
i
=
pData
[
i
];
pCtx
->
param
[
1
].
nType
=
pCtx
->
inputType
;
pCtx
->
param
[
1
].
param
.
i
=
pData
[
i
];
pCtx
->
param
[
1
].
param
.
nType
=
pCtx
->
inputType
;
notNullElems
++
;
}
break
;
...
...
@@ -2916,15 +2794,15 @@ static void diff_function(SqlFunctionCtx *pCtx) {
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
pData
[
i
]
-
pCtx
->
param
[
1
].
i
;
// direct previous may be null
if
(
pCtx
->
param
[
1
].
param
.
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
pData
[
i
]
-
pCtx
->
param
[
1
].
param
.
i
;
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
pCtx
->
param
[
1
].
i
=
pData
[
i
];
pCtx
->
param
[
1
].
nType
=
pCtx
->
inputType
;
pCtx
->
param
[
1
].
param
.
i
=
pData
[
i
];
pCtx
->
param
[
1
].
param
.
nType
=
pCtx
->
inputType
;
notNullElems
++
;
}
break
;
...
...
@@ -2938,15 +2816,15 @@ static void diff_function(SqlFunctionCtx *pCtx) {
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
SET_DOUBLE_VAL
(
pOutput
,
pData
[
i
]
-
pCtx
->
param
[
1
].
d
);
// direct previous may be null
if
(
pCtx
->
param
[
1
].
param
.
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
SET_DOUBLE_VAL
(
pOutput
,
pData
[
i
]
-
pCtx
->
param
[
1
].
param
.
d
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
pCtx
->
param
[
1
].
d
=
pData
[
i
];
pCtx
->
param
[
1
].
nType
=
pCtx
->
inputType
;
pCtx
->
param
[
1
].
param
.
d
=
pData
[
i
];
pCtx
->
param
[
1
].
param
.
nType
=
pCtx
->
inputType
;
notNullElems
++
;
}
break
;
...
...
@@ -2960,15 +2838,15 @@ static void diff_function(SqlFunctionCtx *pCtx) {
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
float
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
d
);
// direct previous may be null
if
(
pCtx
->
param
[
1
].
param
.
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
float
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
param
.
d
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
pCtx
->
param
[
1
].
d
=
pData
[
i
];
pCtx
->
param
[
1
].
nType
=
pCtx
->
inputType
;
pCtx
->
param
[
1
].
param
.
d
=
pData
[
i
];
pCtx
->
param
[
1
].
param
.
nType
=
pCtx
->
inputType
;
notNullElems
++
;
}
break
;
...
...
@@ -2982,15 +2860,15 @@ static void diff_function(SqlFunctionCtx *pCtx) {
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
int16_t
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
i
);
// direct previous may be null
if
(
pCtx
->
param
[
1
].
param
.
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
int16_t
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
param
.
i
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
pCtx
->
param
[
1
].
i
=
pData
[
i
];
pCtx
->
param
[
1
].
nType
=
pCtx
->
inputType
;
pCtx
->
param
[
1
].
param
.
i
=
pData
[
i
];
pCtx
->
param
[
1
].
param
.
nType
=
pCtx
->
inputType
;
notNullElems
++
;
}
break
;
...
...
@@ -3005,15 +2883,15 @@ static void diff_function(SqlFunctionCtx *pCtx) {
continue
;
}
if
(
pCtx
->
param
[
1
].
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
int8_t
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
i
);
// direct previous may be null
if
(
pCtx
->
param
[
1
].
param
.
nType
!=
INITIAL_VALUE_NOT_ASSIGNED
)
{
// initial value is not set yet
*
pOutput
=
(
int8_t
)(
pData
[
i
]
-
pCtx
->
param
[
1
].
param
.
i
);
// direct previous may be null
*
pTimestamp
=
(
tsList
!=
NULL
)
?
tsList
[
i
]
:
0
;
pOutput
+=
1
;
pTimestamp
+=
1
;
}
pCtx
->
param
[
1
].
i
=
pData
[
i
];
pCtx
->
param
[
1
].
nType
=
pCtx
->
inputType
;
pCtx
->
param
[
1
].
param
.
i
=
pData
[
i
];
pCtx
->
param
[
1
].
param
.
nType
=
pCtx
->
inputType
;
notNullElems
++
;
}
break
;
...
...
@@ -3024,7 +2902,7 @@ static void diff_function(SqlFunctionCtx *pCtx) {
}
// initial value is not set yet
if
(
pCtx
->
param
[
1
].
nType
==
INITIAL_VALUE_NOT_ASSIGNED
||
notNullElems
<=
0
)
{
if
(
pCtx
->
param
[
1
].
param
.
nType
==
INITIAL_VALUE_NOT_ASSIGNED
||
notNullElems
<=
0
)
{
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
...
...
@@ -3091,8 +2969,8 @@ static bool spread_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRe
// this is the server-side setup function in client-side, the secondary merge do not need this procedure
if
(
pCtx
->
currentStage
==
MERGE_STAGE
)
{
pCtx
->
param
[
0
]
.
d
=
DBL_MAX
;
pCtx
->
param
[
3
]
.
d
=
-
DBL_MAX
;
// pCtx->param[0].param
.d = DBL_MAX;
// pCtx->param[3].param
.d = -DBL_MAX;
}
else
{
pInfo
->
min
=
DBL_MAX
;
pInfo
->
max
=
-
DBL_MAX
;
...
...
@@ -3192,13 +3070,13 @@ void spread_func_merge(SqlFunctionCtx *pCtx) {
return
;
}
if
(
pCtx
->
param
[
0
]
.
d
>
pData
->
min
)
{
pCtx
->
param
[
0
]
.
d
=
pData
->
min
;
}
// if (pCtx->param[0].param
.d > pData->min) {
// pCtx->param[0].param
.d = pData->min;
//
}
if
(
pCtx
->
param
[
3
]
.
d
<
pData
->
max
)
{
pCtx
->
param
[
3
]
.
d
=
pData
->
max
;
}
// if (pCtx->param[3].param
.d < pData->max) {
// pCtx->param[3].param
.d = pData->max;
//
}
// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
}
...
...
@@ -3218,7 +3096,7 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) {
// return;
// }
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
pCtx
->
param
[
3
].
d
-
pCtx
->
param
[
0
]
.
d
);
// SET_DOUBLE_VAL((double *)pCtx->pOutput, pCtx->param[3].param.d - pCtx->param[0].param
.d);
}
else
{
assert
(
IS_NUMERIC_TYPE
(
pCtx
->
inputType
)
||
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_TIMESTAMP
));
...
...
@@ -3571,7 +3449,7 @@ void twa_function_finalizer(SqlFunctionCtx *pCtx) {
*/
static
void
interp_function_impl
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
type
=
(
int32_t
)
pCtx
->
param
[
2
].
i
;
int32_t
type
=
(
int32_t
)
pCtx
->
param
[
2
].
param
.
i
;
if
(
type
==
TSDB_FILL_NONE
)
{
return
;
}
...
...
@@ -3583,7 +3461,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) {
}
else
if
(
type
==
TSDB_FILL_NULL
)
{
setNull
(
pCtx
->
pOutput
,
pCtx
->
resDataInfo
.
type
,
pCtx
->
resDataInfo
.
bytes
);
}
else
if
(
type
==
TSDB_FILL_SET_VALUE
)
{
taosVariantDump
(
&
pCtx
->
param
[
1
],
pCtx
->
pOutput
,
pCtx
->
inputType
,
true
);
//
taosVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
}
else
{
if
(
pCtx
->
start
.
key
!=
INT64_MIN
&&
((
ascQuery
&&
pCtx
->
start
.
key
<=
pCtx
->
startTs
&&
pCtx
->
end
.
key
>=
pCtx
->
startTs
)
||
((
!
ascQuery
)
&&
pCtx
->
start
.
key
>=
pCtx
->
startTs
&&
pCtx
->
end
.
key
<=
pCtx
->
startTs
)))
{
if
(
type
==
TSDB_FILL_PREV
)
{
...
...
@@ -3755,11 +3633,11 @@ static void ts_comp_function(SqlFunctionCtx *pCtx) {
// primary ts must be existed, so no need to check its existance
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
tsBufAppend
(
pTSbuf
,
(
int32_t
)
pCtx
->
param
[
0
]
.
i
,
&
pCtx
->
tag
,
input
,
pCtx
->
size
*
TSDB_KEYSIZE
);
// tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].param
.i, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
}
else
{
for
(
int32_t
i
=
pCtx
->
size
-
1
;
i
>=
0
;
--
i
)
{
char
*
d
=
GET_INPUT_DATA
(
pCtx
,
i
);
tsBufAppend
(
pTSbuf
,
(
int32_t
)
pCtx
->
param
[
0
]
.
i
,
&
pCtx
->
tag
,
d
,
(
int32_t
)
TSDB_KEYSIZE
);
// tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].param
.i, &pCtx->tag, d, (int32_t)TSDB_KEYSIZE);
}
}
...
...
@@ -3911,7 +3789,7 @@ static void rate_finalizer(SqlFunctionCtx *pCtx) {
return
;
}
SET_DOUBLE_VAL
((
double
*
)
pCtx
->
pOutput
,
do_calc_rate
(
pRateInfo
,
(
double
)
TSDB_TICK_PER_SECOND
(
pCtx
->
param
[
0
]
.
i
)));
// SET_DOUBLE_VAL((double*) pCtx->pOutput, do_calc_rate(pRateInfo, (double) TSDB_TICK_PER_SECOND(pCtx->param[0].param
.i)));
// cannot set the numOfIteratedElems again since it is set during previous iteration
pResInfo
->
numOfRes
=
1
;
...
...
@@ -4008,7 +3886,7 @@ static void blockInfo_func(SqlFunctionCtx* pCtx) {
int32_t
len
=
*
(
int32_t
*
)
pCtx
->
pInput
;
blockDistInfoFromBinary
((
char
*
)
pCtx
->
pInput
+
sizeof
(
int32_t
),
len
,
pDist
);
pDist
->
rowSize
=
(
uint16_t
)
pCtx
->
param
[
0
]
.
i
;
// pDist->rowSize = (uint16_t)pCtx->param[0].param
.i;
memcpy
(
pCtx
->
pOutput
,
pCtx
->
pInput
,
sizeof
(
int32_t
)
+
len
);
...
...
@@ -4160,7 +4038,7 @@ void blockinfo_func_finalizer(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STableBlockDistInfo
*
pDist
=
(
STableBlockDistInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
pDist
->
rowSize
=
(
uint16_t
)
pCtx
->
param
[
0
]
.
i
;
// pDist->rowSize = (uint16_t)pCtx->param[0].param
.i;
generateBlockDistResult
(
pDist
,
pCtx
->
pOutput
);
if
(
pDist
->
dataBlockInfos
!=
NULL
)
{
...
...
@@ -4557,9 +4435,9 @@ SAggFunctionInfo aggFunc[35] = {{
FUNCTION_AVG
,
FUNCSTATE_SO
|
FUNCSTATE_STABLE
,
function_setup
,
stddev_dst_function
,
stddev_dst_finalizer
,
stddev_dst_merge
,
NULL
,
NULL
,
NULL
,
dataBlockRequired
,
},
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录