Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c4b3da50
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c4b3da50
编写于
10月 13, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code format
上级
c79cc7e4
变更
17
显示空白变更内容
内联
并排
Showing
17 changed file
with
574 addition
and
623 deletion
+574
-623
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+81
-81
source/libs/function/inc/tfunctionInt.h
source/libs/function/inc/tfunctionInt.h
+5
-5
source/libs/function/inc/thistogram.h
source/libs/function/inc/thistogram.h
+8
-8
source/libs/function/inc/tpercentile.h
source/libs/function/inc/tpercentile.h
+3
-3
source/libs/function/inc/tscript.h
source/libs/function/inc/tscript.h
+4
-4
source/libs/function/inc/tudfInt.h
source/libs/function/inc/tudfInt.h
+16
-18
source/libs/function/inc/udfc.h
source/libs/function/inc/udfc.h
+37
-41
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+4
-3
source/libs/function/src/tfunctionInt.c
source/libs/function/src/tfunctionInt.c
+7
-11
source/libs/function/src/thistogram.c
source/libs/function/src/thistogram.c
+7
-7
source/libs/function/src/tpercentile.c
source/libs/function/src/tpercentile.c
+79
-80
source/libs/function/src/tscript.c
source/libs/function/src/tscript.c
+2
-2
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+292
-322
source/libs/function/test/runUdf.c
source/libs/function/test/runUdf.c
+6
-7
source/libs/function/test/udf1.c
source/libs/function/test/udf1.c
+8
-13
source/libs/function/test/udf2.c
source/libs/function/test/udf2.c
+12
-17
tools/scripts/codeFormat.sh
tools/scripts/codeFormat.sh
+3
-1
未找到文件。
source/libs/function/inc/builtinsimpl.h
浏览文件 @
c4b3da50
...
...
@@ -28,32 +28,32 @@ bool dummyInit(SqlFunctionCtx* UNUSED_PARAM(pCtx), SResultRowEntryInfo* UNUSED_P
int32_t
dummyProcess
(
SqlFunctionCtx
*
UNUSED_PARAM
(
pCtx
));
int32_t
dummyFinalize
(
SqlFunctionCtx
*
UNUSED_PARAM
(
pCtx
),
SSDataBlock
*
UNUSED_PARAM
(
pBlock
));
bool
functionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
functionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
functionFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
functionFinalizeWithResultBuf
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
char
*
finalResult
);
int32_t
combineFunction
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
EFuncDataRequired
countDataRequired
(
SFunctionNode
*
pFunc
,
STimeWindow
*
pTimeWindow
);
bool
getCountFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
countFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
countInvertFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
countFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
countInvertFunction
(
SqlFunctionCtx
*
pCtx
);
EFuncDataRequired
statisDataRequired
(
SFunctionNode
*
pFunc
,
STimeWindow
*
pTimeWindow
);
bool
getSumFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
sumFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
sumInvertFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
sumFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
sumInvertFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
sumCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
bool
minmaxFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
minmaxFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
getMinmaxFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
minFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
maxFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
maxFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
minmaxFunctionFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
minCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
int32_t
maxCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
bool
getAvgFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
avgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
avgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
avgFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
avgFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
avgFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
...
...
@@ -63,7 +63,7 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t
getAvgInfoSize
();
bool
getStddevFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
stddevFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
stddevFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
stddevFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
stddevFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
stddevFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
...
...
@@ -73,20 +73,20 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t
getStddevInfoSize
();
bool
getLeastSQRFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
leastSQRFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
leastSQRFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
leastSQRFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
leastSQRFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
leastSQRInvertFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
leastSQRCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
bool
getPercentileFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
percentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
percentileFunction
(
SqlFunctionCtx
*
pCtx
);
bool
percentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
percentileFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
percentileFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getApercentileFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
apercentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
apercentileFunction
(
SqlFunctionCtx
*
pCtx
);
bool
apercentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
apercentileFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
apercentileFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
apercentileFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
apercentilePartialFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
...
...
@@ -94,25 +94,25 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
int32_t
getApercentileMaxSize
();
bool
getDiffFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
diffFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
int32_t
diffFunction
(
SqlFunctionCtx
*
pCtx
);
bool
diffFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
int32_t
diffFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getDerivativeFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
derivativeFuncSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
int32_t
derivativeFunction
(
SqlFunctionCtx
*
pCtx
);
bool
derivativeFuncSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
int32_t
derivativeFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getIrateFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
irateFuncSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
int32_t
irateFunction
(
SqlFunctionCtx
*
pCtx
);
bool
irateFuncSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
int32_t
irateFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
irateFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
cachedLastRowFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getFirstLastFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
firstFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
firstFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
lastFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
lastFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
firstFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
firstFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
lastFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
lastFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
firstLastFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
firstLastPartialFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
firstCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
...
...
@@ -120,15 +120,15 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t
getFirstLastInfoSize
(
int32_t
resBytes
);
EFuncDataRequired
lastDynDataReq
(
void
*
pRes
,
STimeWindow
*
pTimeWindow
);
int32_t
lastRowFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
lastRowFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getTopBotFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
);
bool
getTopBotMergeFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
);
bool
topBotFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
topFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
topFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
bottomFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
bottomFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
bool
topBotFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
topFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
topFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
bottomFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
bottomFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
topBotFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
topBotPartialFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
topBotMergeFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
...
...
@@ -137,7 +137,7 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t
getTopBotInfoSize
(
int64_t
numOfItems
);
bool
getSpreadFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
spreadFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
spreadFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
spreadFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
spreadFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
spreadFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
...
...
@@ -146,7 +146,7 @@ int32_t getSpreadInfoSize();
int32_t
spreadCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
bool
getElapsedFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
elapsedFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
elapsedFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
elapsedFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
elapsedFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
elapsedFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
...
...
@@ -155,7 +155,7 @@ int32_t getElapsedInfoSize();
int32_t
elapsedCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
bool
getHistogramFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
histogramFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
histogramFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
histogramFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
histogramFunctionPartial
(
SqlFunctionCtx
*
pCtx
);
int32_t
histogramFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
...
...
@@ -173,7 +173,7 @@ int32_t getHLLInfoSize();
int32_t
hllCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
bool
getStateFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
stateFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
stateFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
stateCountFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
stateDurationFunction
(
SqlFunctionCtx
*
pCtx
);
...
...
@@ -181,36 +181,36 @@ bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t
csumFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getMavgFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
mavgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
mavgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
mavgFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getSampleFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
sampleFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
sampleFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
sampleFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
sampleFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getTailFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
tailFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
tailFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
tailFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getUniqueFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
uniqueFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
uniqueFunction
(
SqlFunctionCtx
*
pCtx
);
bool
uniqueFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
uniqueFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getModeFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
modeFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
modeFunction
(
SqlFunctionCtx
*
pCtx
);
bool
modeFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
modeFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
modeFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getTwaFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
twaFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
twaFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
twaFinalize
(
struct
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
twaFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
twaFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
twaFinalize
(
struct
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getSelectivityFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
blockDistSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
blockDistFunction
(
SqlFunctionCtx
*
pCtx
);
bool
blockDistSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
blockDistFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
blockDistFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getGroupKeyFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
...
...
source/libs/function/inc/tfunctionInt.h
浏览文件 @
c4b3da50
...
...
@@ -22,11 +22,11 @@ extern "C" {
#include "os.h"
#include "tname.h"
#include "taosdef.h"
#include "tvariant.h"
#include "function.h"
#include "taosdef.h"
#include "tname.h"
#include "tudf.h"
#include "tvariant.h"
bool
topbot_datablock_filter
(
SqlFunctionCtx
*
pCtx
,
const
char
*
minval
,
const
char
*
maxval
);
...
...
source/libs/function/inc/thistogram.h
浏览文件 @
c4b3da50
source/libs/function/inc/tpercentile.h
浏览文件 @
c4b3da50
...
...
@@ -61,10 +61,10 @@ typedef struct tMemBucket {
MinMaxEntry
range
;
// value range
int32_t
times
;
// count that has been checked for deciding the correct data value buckets.
__compar_fn_t
comparFn
;
tMemBucketSlot
*
pSlots
;
SDiskbasedBuf
*
pBuffer
;
tMemBucketSlot
*
pSlots
;
SDiskbasedBuf
*
pBuffer
;
__perc_hash_func_t
hashFunc
;
SHashObj
*
groupPagesMap
;
// disk page map for different groups;
SHashObj
*
groupPagesMap
;
// disk page map for different groups;
}
tMemBucket
;
tMemBucket
*
tMemBucketCreate
(
int16_t
nElemSize
,
int16_t
dataType
,
double
minval
,
double
maxval
);
...
...
source/libs/function/inc/tscript.h
浏览文件 @
c4b3da50
...
...
@@ -17,14 +17,14 @@
#define TDENGINE_QSCRIPT_H
#if 0
#include <lua.h>
#include <lauxlib.h>
#include <lua.h>
#include <lualib.h>
#include "tutil.h"
#include "hash.h"
#include "tlist.h"
#include "tudf.h"
#include "tutil.h"
#define MAX_FUNC_NAME 64
...
...
@@ -81,4 +81,4 @@ void scriptEnvPoolCleanup();
bool isValidScript(char *script, int32_t len);
#endif
#endif
//TDENGINE_QSCRIPT_H
#endif
// TDENGINE_QSCRIPT_H
source/libs/function/inc/tudfInt.h
浏览文件 @
c4b3da50
...
...
@@ -61,12 +61,10 @@ typedef struct SUdfCallResponse {
SUdfInterBuf
resultBuf
;
}
SUdfCallResponse
;
typedef
struct
SUdfTeardownRequest
{
int64_t
udfHandle
;
}
SUdfTeardownRequest
;
typedef
struct
SUdfTeardownResponse
{
#ifdef WINDOWS
size_t
avoidCompilationErrors
;
...
...
@@ -98,20 +96,20 @@ typedef struct SUdfResponse {
};
}
SUdfResponse
;
int32_t
encodeUdfRequest
(
void
**
buf
,
const
SUdfRequest
*
request
);
void
*
decodeUdfRequest
(
const
void
*
buf
,
SUdfRequest
*
request
);
int32_t
encodeUdfRequest
(
void
**
buf
,
const
SUdfRequest
*
request
);
void
*
decodeUdfRequest
(
const
void
*
buf
,
SUdfRequest
*
request
);
int32_t
encodeUdfResponse
(
void
**
buf
,
const
SUdfResponse
*
response
);
void
*
decodeUdfResponse
(
const
void
*
buf
,
SUdfResponse
*
response
);
void
*
decodeUdfResponse
(
const
void
*
buf
,
SUdfResponse
*
response
);
void
freeUdfColumnData
(
SUdfColumnData
*
data
,
SUdfColumnMeta
*
meta
);
void
freeUdfColumn
(
SUdfColumn
*
col
);
void
freeUdfColumn
(
SUdfColumn
*
col
);
void
freeUdfDataDataBlock
(
SUdfDataBlock
*
block
);
int32_t
convertDataBlockToUdfDataBlock
(
SSDataBlock
*
block
,
SUdfDataBlock
*
udfBlock
);
int32_t
convertUdfColumnToDataBlock
(
SUdfColumn
*
udfCol
,
SSDataBlock
*
block
);
int32_t
getUdfdPipeName
(
char
*
pipeName
,
int32_t
size
);
int32_t
getUdfdPipeName
(
char
*
pipeName
,
int32_t
size
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/function/inc/udfc.h
浏览文件 @
c4b3da50
...
...
@@ -20,7 +20,7 @@ enum {
};
typedef
struct
SSDataBlock
{
typedef
struct
SSDataBlock
{
char
*
data
;
int32_t
size
;
}
SSDataBlock
;
...
...
@@ -36,9 +36,9 @@ int32_t createUdfdProxy();
int32_t
destroyUdfdProxy
();
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfcFuncHandle *handles);
//
int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfcFuncHandle *handles);
int32_t
setupUdf
(
SUdfInfo
*
udf
,
UdfcFuncHandle
*
handle
);
int32_t
setupUdf
(
SUdfInfo
*
udf
,
UdfcFuncHandle
*
handle
);
int32_t
callUdf
(
UdfcFuncHandle
handle
,
int8_t
step
,
char
*
state
,
int32_t
stateSize
,
SSDataBlock
input
,
char
**
newstate
,
int32_t
*
newStateSize
,
SSDataBlock
*
output
);
...
...
@@ -48,7 +48,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle);
typedef
struct
SUdfSetupRequest
{
char
udfName
[
16
];
//
int8_t
scriptType
;
// 0:c, 1: lua, 2:js
int8_t
udfType
;
//
udaf, udf, udtf
int8_t
udfType
;
//
udaf, udf, udtf
int16_t
pathSize
;
char
*
path
;
}
SUdfSetupRequest
;
...
...
@@ -57,7 +57,6 @@ typedef struct SUdfSetupResponse {
int64_t
udfHandle
;
}
SUdfSetupResponse
;
typedef
struct
SUdfCallRequest
{
int64_t
udfHandle
;
int8_t
step
;
...
...
@@ -69,7 +68,6 @@ typedef struct SUdfCallRequest {
char
*
state
;
}
SUdfCallRequest
;
typedef
struct
SUdfCallResponse
{
int32_t
outputBytes
;
char
*
output
;
...
...
@@ -77,12 +75,10 @@ typedef struct SUdfCallResponse {
char
*
newState
;
}
SUdfCallResponse
;
typedef
struct
SUdfTeardownRequest
{
int64_t
udfHandle
;
}
SUdfTeardownRequest
;
typedef
struct
SUdfTeardownResponse
{
#ifdef WINDOWS
size_t
avoidCompilationErrors
;
...
...
@@ -110,4 +106,4 @@ int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest);
int32_t
encodeResponse
(
char
**
buf
,
int32_t
*
bufLen
,
SUdfResponse
*
response
);
int32_t
encodeRequest
(
char
**
buf
,
int32_t
*
bufLen
,
SUdfRequest
*
request
);
int32_t
decodeResponse
(
char
*
buf
,
int32_t
bufLen
,
SUdfResponse
**
pResponse
);
#endif
//
UDF_UDF_H
#endif
//
UDF_UDF_H
source/libs/function/src/builtinsimpl.c
浏览文件 @
c4b3da50
...
...
@@ -2977,7 +2977,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
return
TSDB_CODE_SUCCESS
;
}
static
void
firstLastTransferInfo
(
SqlFunctionCtx
*
pCtx
,
SFirstLastRes
*
pInput
,
SFirstLastRes
*
pOutput
,
bool
isFirst
,
int32_t
rowIndex
)
{
static
void
firstLastTransferInfo
(
SqlFunctionCtx
*
pCtx
,
SFirstLastRes
*
pInput
,
SFirstLastRes
*
pOutput
,
bool
isFirst
,
int32_t
rowIndex
)
{
SInputColumnInfoData
*
pColInfo
=
&
pCtx
->
input
;
if
(
pOutput
->
hasResult
)
{
...
...
source/libs/function/src/tfunctionInt.c
浏览文件 @
c4b3da50
...
...
@@ -15,8 +15,8 @@
#include "os.h"
#include "taosdef.h"
#include "tmsg.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
#include "function.h"
...
...
@@ -29,15 +29,13 @@
#include "ttszip.h"
#include "tudf.h"
void
cleanupResultRowEntry
(
struct
SResultRowEntryInfo
*
pCell
)
{
pCell
->
initialized
=
false
;
}
void
cleanupResultRowEntry
(
struct
SResultRowEntryInfo
*
pCell
)
{
pCell
->
initialized
=
false
;
}
int32_t
getNumOfResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
num
,
SSDataBlock
*
pResBlock
)
{
int32_t
maxRows
=
0
;
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
j
]);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
j
]);
if
(
pResInfo
!=
NULL
&&
maxRows
<
pResInfo
->
numOfRes
)
{
maxRows
=
pResInfo
->
numOfRes
;
}
...
...
@@ -46,12 +44,12 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock
assert
(
maxRows
>=
0
);
blockDataEnsureCapacity
(
pResBlock
,
maxRows
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pResBlock
->
pDataBlock
,
i
);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
i
]);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
i
]);
if
(
pResInfo
->
numOfRes
==
0
)
{
for
(
int32_t
j
=
0
;
j
<
pResInfo
->
numOfRes
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pResInfo
->
numOfRes
;
++
j
)
{
colDataAppend
(
pCol
,
j
,
NULL
,
true
);
// TODO add set null data api
}
}
else
{
...
...
@@ -70,6 +68,4 @@ bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry) {
return
pEntry
->
complete
;
}
bool
isRowEntryInitialized
(
struct
SResultRowEntryInfo
*
pEntry
)
{
return
pEntry
->
initialized
;
}
bool
isRowEntryInitialized
(
struct
SResultRowEntryInfo
*
pEntry
)
{
return
pEntry
->
initialized
;
}
source/libs/function/src/thistogram.c
浏览文件 @
c4b3da50
...
...
@@ -14,10 +14,10 @@
*/
#include "os.h"
#include "thistogram.h"
#include "taosdef.h"
#include "t
msg
.h"
#include "t
histogram
.h"
#include "tlosertree.h"
#include "tmsg.h"
/**
*
...
...
@@ -54,7 +54,7 @@ SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins) {
SHistogramInfo
*
pHisto
=
(
SHistogramInfo
*
)
pBuf
;
pHisto
->
elems
=
(
SHistBin
*
)((
char
*
)
pBuf
+
sizeof
(
SHistogramInfo
));
for
(
int32_t
i
=
0
;
i
<
numOfBins
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfBins
;
++
i
)
{
pHisto
->
elems
[
i
].
val
=
-
DBL_MAX
;
}
...
...
@@ -357,7 +357,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto) {
}
void
tHistogramPrint
(
SHistogramInfo
*
pHisto
)
{
printf
(
"total entries: %d, elements: %"
PRId64
"
\n
"
,
pHisto
->
numOfEntries
,
pHisto
->
numOfElems
);
printf
(
"total entries: %d, elements: %"
PRId64
"
\n
"
,
pHisto
->
numOfEntries
,
pHisto
->
numOfElems
);
#if defined(USE_ARRAYLIST)
for
(
int32_t
i
=
0
;
i
<
pHisto
->
numOfEntries
;
++
i
)
{
printf
(
"%d: (%f, %"
PRId64
")
\n
"
,
i
+
1
,
pHisto
->
elems
[
i
].
val
,
pHisto
->
elems
[
i
].
num
);
...
...
source/libs/function/src/tpercentile.c
浏览文件 @
c4b3da50
...
...
@@ -14,8 +14,8 @@
*/
#include "taoserror.h"
#include "tglobal.h"
#include "tcompare.h"
#include "tglobal.h"
#include "taosdef.h"
#include "tcompare.h"
...
...
@@ -25,21 +25,20 @@
#define DEFAULT_NUM_OF_SLOT 1024
int32_t
getGroupId
(
int32_t
numOfSlots
,
int32_t
slotIndex
,
int32_t
times
)
{
return
(
times
*
numOfSlots
)
+
slotIndex
;
}
int32_t
getGroupId
(
int32_t
numOfSlots
,
int32_t
slotIndex
,
int32_t
times
)
{
return
(
times
*
numOfSlots
)
+
slotIndex
;
}
static
SFilePage
*
loadDataFromFilePage
(
tMemBucket
*
pMemBucket
,
int32_t
slotIdx
)
{
SFilePage
*
buffer
=
(
SFilePage
*
)
taosMemoryCalloc
(
1
,
pMemBucket
->
bytes
*
pMemBucket
->
pSlots
[
slotIdx
].
info
.
size
+
sizeof
(
SFilePage
));
SFilePage
*
buffer
=
(
SFilePage
*
)
taosMemoryCalloc
(
1
,
pMemBucket
->
bytes
*
pMemBucket
->
pSlots
[
slotIdx
].
info
.
size
+
sizeof
(
SFilePage
));
int32_t
groupId
=
getGroupId
(
pMemBucket
->
numOfSlots
,
slotIdx
,
pMemBucket
->
times
);
SArray
*
pIdList
=
*
(
SArray
**
)
taosHashGet
(
pMemBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
));
SArray
*
pIdList
=
*
(
SArray
**
)
taosHashGet
(
pMemBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
));
int32_t
offset
=
0
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pIdList
);
++
i
)
{
int32_t
*
pageId
=
taosArrayGet
(
pIdList
,
i
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pIdList
);
++
i
)
{
int32_t
*
pageId
=
taosArrayGet
(
pIdList
,
i
);
SFilePage
*
pg
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
SFilePage
*
pg
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
memcpy
(
buffer
->
data
+
offset
,
pg
->
data
,
(
size_t
)(
pg
->
num
*
pMemBucket
->
bytes
));
offset
+=
(
int32_t
)(
pg
->
num
*
pMemBucket
->
bytes
);
...
...
@@ -49,7 +48,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
return
buffer
;
}
static
void
resetBoundingBox
(
MinMaxEntry
*
range
,
int32_t
type
)
{
static
void
resetBoundingBox
(
MinMaxEntry
*
range
,
int32_t
type
)
{
if
(
IS_SIGNED_NUMERIC_TYPE
(
type
))
{
range
->
i64MaxVal
=
INT64_MIN
;
range
->
i64MinVal
=
INT64_MAX
;
...
...
@@ -62,17 +61,17 @@ static void resetBoundingBox(MinMaxEntry* range, int32_t type) {
}
}
static
int32_t
setBoundingBox
(
MinMaxEntry
*
range
,
int16_t
type
,
double
minval
,
double
maxval
)
{
static
int32_t
setBoundingBox
(
MinMaxEntry
*
range
,
int16_t
type
,
double
minval
,
double
maxval
)
{
if
(
minval
>
maxval
)
{
return
-
1
;
}
if
(
IS_SIGNED_NUMERIC_TYPE
(
type
))
{
range
->
i64MinVal
=
(
int64_t
)
minval
;
range
->
i64MaxVal
=
(
int64_t
)
maxval
;
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
)){
range
->
u64MinVal
=
(
uint64_t
)
minval
;
range
->
u64MaxVal
=
(
uint64_t
)
maxval
;
range
->
i64MinVal
=
(
int64_t
)
minval
;
range
->
i64MaxVal
=
(
int64_t
)
maxval
;
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
))
{
range
->
u64MinVal
=
(
uint64_t
)
minval
;
range
->
u64MaxVal
=
(
uint64_t
)
maxval
;
}
else
{
range
->
dMinVal
=
minval
;
range
->
dMaxVal
=
maxval
;
...
...
@@ -81,7 +80,7 @@ static int32_t setBoundingBox(MinMaxEntry* range, int16_t type, double minval, d
return
0
;
}
static
void
resetPosInfo
(
SSlotInfo
*
pInfo
)
{
static
void
resetPosInfo
(
SSlotInfo
*
pInfo
)
{
pInfo
->
size
=
0
;
pInfo
->
pageId
=
-
1
;
pInfo
->
data
=
NULL
;
...
...
@@ -97,11 +96,11 @@ double findOnlyResult(tMemBucket *pMemBucket) {
}
int32_t
groupId
=
getGroupId
(
pMemBucket
->
numOfSlots
,
i
,
pMemBucket
->
times
);
SArray
*
list
=
*
(
SArray
**
)
taosHashGet
(
pMemBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
));
SArray
*
list
=
*
(
SArray
**
)
taosHashGet
(
pMemBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
));
assert
(
list
->
size
==
1
);
int32_t
*
pageId
=
taosArrayGet
(
list
,
0
);
SFilePage
*
pPage
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
int32_t
*
pageId
=
taosArrayGet
(
list
,
0
);
SFilePage
*
pPage
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
assert
(
pPage
->
num
==
1
);
double
v
=
0
;
...
...
@@ -155,7 +154,7 @@ int32_t tBucketUintHash(tMemBucket *pBucket, const void *value) {
uint64_t
span
=
pBucket
->
range
.
u64MaxVal
-
pBucket
->
range
.
u64MinVal
;
if
(
span
<
pBucket
->
numOfSlots
)
{
int64_t
delta
=
v
-
pBucket
->
range
.
u64MinVal
;
index
=
(
int32_t
)
(
delta
%
pBucket
->
numOfSlots
);
index
=
(
int32_t
)(
delta
%
pBucket
->
numOfSlots
);
}
else
{
double
slotSpan
=
(
double
)
span
/
pBucket
->
numOfSlots
;
index
=
(
int32_t
)((
v
-
pBucket
->
range
.
u64MinVal
)
/
slotSpan
);
...
...
@@ -209,9 +208,9 @@ static __perc_hash_func_t getHashFunc(int32_t type) {
}
}
static
void
resetSlotInfo
(
tMemBucket
*
pBucket
)
{
static
void
resetSlotInfo
(
tMemBucket
*
pBucket
)
{
for
(
int32_t
i
=
0
;
i
<
pBucket
->
numOfSlots
;
++
i
)
{
tMemBucketSlot
*
pSlot
=
&
pBucket
->
pSlots
[
i
];
tMemBucketSlot
*
pSlot
=
&
pBucket
->
pSlots
[
i
];
resetBoundingBox
(
&
pSlot
->
range
,
pBucket
->
type
);
resetPosInfo
(
&
pSlot
->
info
);
...
...
@@ -235,17 +234,17 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
pBucket
->
maxCapacity
=
200000
;
pBucket
->
groupPagesMap
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
setBoundingBox
(
&
pBucket
->
range
,
pBucket
->
type
,
minval
,
maxval
)
!=
0
)
{
// qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval);
// qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval);
taosMemoryFree
(
pBucket
);
return
NULL
;
}
pBucket
->
elemPerPage
=
(
pBucket
->
bufPageSize
-
sizeof
(
SFilePage
))
/
pBucket
->
bytes
;
pBucket
->
elemPerPage
=
(
pBucket
->
bufPageSize
-
sizeof
(
SFilePage
))
/
pBucket
->
bytes
;
pBucket
->
comparFn
=
getKeyComparFunc
(
pBucket
->
type
,
TSDB_ORDER_ASC
);
pBucket
->
hashFunc
=
getHashFunc
(
pBucket
->
type
);
if
(
pBucket
->
hashFunc
==
NULL
)
{
// qError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type);
// qError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type);
taosMemoryFree
(
pBucket
);
return
NULL
;
}
...
...
@@ -271,7 +270,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
return
NULL
;
}
// qDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes);
// qDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes);
return
pBucket
;
}
...
...
@@ -280,9 +279,9 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
return
;
}
void
*
p
=
taosHashIterate
(
pBucket
->
groupPagesMap
,
NULL
);
while
(
p
)
{
SArray
**
p1
=
p
;
void
*
p
=
taosHashIterate
(
pBucket
->
groupPagesMap
,
NULL
);
while
(
p
)
{
SArray
**
p1
=
p
;
p
=
taosHashIterate
(
pBucket
->
groupPagesMap
,
p
);
taosArrayDestroy
(
*
p1
);
}
...
...
@@ -341,7 +340,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
int32_t
count
=
0
;
int32_t
bytes
=
pBucket
->
bytes
;
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
char
*
d
=
(
char
*
)
data
+
i
*
bytes
;
char
*
d
=
(
char
*
)
data
+
i
*
bytes
;
int32_t
index
=
(
pBucket
->
hashFunc
)(
pBucket
,
d
);
if
(
index
<
0
)
{
continue
;
...
...
@@ -365,9 +364,9 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot
->
info
.
data
=
NULL
;
}
SArray
*
pPageIdList
=
(
SArray
*
)
taosHashGet
(
pBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
));
SArray
*
pPageIdList
=
(
SArray
*
)
taosHashGet
(
pBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
));
if
(
pPageIdList
==
NULL
)
{
SArray
*
pList
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
SArray
*
pList
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
taosHashPut
(
pBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
),
&
pList
,
POINTER_BYTES
);
pPageIdList
=
pList
;
}
...
...
@@ -407,18 +406,18 @@ static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int3
static
bool
isIdenticalData
(
tMemBucket
*
pMemBucket
,
int32_t
index
);
static
double
getIdenticalDataVal
(
tMemBucket
*
pMemBucket
,
int32_t
slotIndex
)
{
static
double
getIdenticalDataVal
(
tMemBucket
*
pMemBucket
,
int32_t
slotIndex
)
{
assert
(
isIdenticalData
(
pMemBucket
,
slotIndex
));
tMemBucketSlot
*
pSlot
=
&
pMemBucket
->
pSlots
[
slotIndex
];
double
finalResult
=
0
.
0
;
if
(
IS_SIGNED_NUMERIC_TYPE
(
pMemBucket
->
type
))
{
finalResult
=
(
double
)
pSlot
->
range
.
i64MinVal
;
finalResult
=
(
double
)
pSlot
->
range
.
i64MinVal
;
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
pMemBucket
->
type
))
{
finalResult
=
(
double
)
pSlot
->
range
.
u64MinVal
;
finalResult
=
(
double
)
pSlot
->
range
.
u64MinVal
;
}
else
{
finalResult
=
(
double
)
pSlot
->
range
.
dMinVal
;
finalResult
=
(
double
)
pSlot
->
range
.
dMinVal
;
}
return
finalResult
;
...
...
@@ -445,14 +444,14 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
double
maxOfThisSlot
=
0
;
double
minOfNextSlot
=
0
;
if
(
IS_SIGNED_NUMERIC_TYPE
(
pMemBucket
->
type
))
{
maxOfThisSlot
=
(
double
)
pSlot
->
range
.
i64MaxVal
;
minOfNextSlot
=
(
double
)
next
.
i64MinVal
;
maxOfThisSlot
=
(
double
)
pSlot
->
range
.
i64MaxVal
;
minOfNextSlot
=
(
double
)
next
.
i64MinVal
;
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
pMemBucket
->
type
))
{
maxOfThisSlot
=
(
double
)
pSlot
->
range
.
u64MaxVal
;
minOfNextSlot
=
(
double
)
next
.
u64MinVal
;
maxOfThisSlot
=
(
double
)
pSlot
->
range
.
u64MaxVal
;
minOfNextSlot
=
(
double
)
next
.
u64MinVal
;
}
else
{
maxOfThisSlot
=
(
double
)
pSlot
->
range
.
dMaxVal
;
minOfNextSlot
=
(
double
)
next
.
dMinVal
;
maxOfThisSlot
=
(
double
)
pSlot
->
range
.
dMaxVal
;
minOfNextSlot
=
(
double
)
next
.
dMinVal
;
}
assert
(
minOfNextSlot
>
maxOfThisSlot
);
...
...
@@ -484,7 +483,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
// try next round
pMemBucket
->
times
+=
1
;
// qDebug("MemBucket:%p, start next round data bucketing, time:%d", pMemBucket, pMemBucket->times);
// qDebug("MemBucket:%p, start next round data bucketing, time:%d", pMemBucket, pMemBucket->times);
pMemBucket
->
range
=
pSlot
->
range
;
pMemBucket
->
total
=
0
;
...
...
@@ -527,7 +526,7 @@ double getPercentile(tMemBucket *pMemBucket, double percent) {
// find the min/max value, no need to scan all data in bucket
if
(
fabs
(
percent
-
100
.
0
)
<
DBL_EPSILON
||
(
percent
<
DBL_EPSILON
))
{
MinMaxEntry
*
pRange
=
&
pMemBucket
->
range
;
MinMaxEntry
*
pRange
=
&
pMemBucket
->
range
;
if
(
IS_SIGNED_NUMERIC_TYPE
(
pMemBucket
->
type
))
{
double
v
=
(
double
)(
fabs
(
percent
-
100
)
<
DBL_EPSILON
?
pRange
->
i64MaxVal
:
pRange
->
i64MinVal
);
...
...
@@ -536,7 +535,7 @@ double getPercentile(tMemBucket *pMemBucket, double percent) {
double
v
=
(
double
)(
fabs
(
percent
-
100
)
<
DBL_EPSILON
?
pRange
->
u64MaxVal
:
pRange
->
u64MinVal
);
return
v
;
}
else
{
return
fabs
(
percent
-
100
)
<
DBL_EPSILON
?
pRange
->
dMaxVal
:
pRange
->
dMinVal
;
return
fabs
(
percent
-
100
)
<
DBL_EPSILON
?
pRange
->
dMaxVal
:
pRange
->
dMinVal
;
}
}
...
...
source/libs/function/src/tscript.c
浏览文件 @
c4b3da50
...
...
@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tscript.h"
#include "
ttype
s.h"
#include "
o
s.h"
#include "tstrbuild.h"
#include "ttypes.h"
//#include "queryLog.h"
#include "ttokendef.h"
#if 0
...
...
source/libs/function/src/tudf.c
浏览文件 @
c4b3da50
...
...
@@ -13,16 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "uv.h"
#include "os.h"
#include "builtinsimpl.h"
#include "fnLog.h"
#include "
tudf
.h"
#include "
tudfInt
.h"
#include "
functionMgt
.h"
#include "
querynodes
.h"
#include "tarray.h"
#include "tglobal.h"
#include "tdatablock.h"
#include "
querynodes
.h"
#include "
builtinsimpl
.h"
#include "
functionMg
t.h"
#include "
tglobal
.h"
#include "
tudf
.h"
#include "
tudfIn
t.h"
typedef
struct
SUdfdData
{
bool
startCalled
;
...
...
@@ -49,8 +51,8 @@ int32_t udfStopUdfd();
static
int32_t
udfSpawnUdfd
(
SUdfdData
*
pData
);
void
udfUdfdExit
(
uv_process_t
*
process
,
int64_t
exitStatus
,
int
termSignal
);
static
int32_t
udfSpawnUdfd
(
SUdfdData
*
pData
);
static
void
udfUdfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
);
static
int32_t
udfSpawnUdfd
(
SUdfdData
*
pData
);
static
void
udfUdfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
);
static
void
udfUdfdStopAsyncCb
(
uv_async_t
*
async
);
static
void
udfWatchUdfd
(
void
*
args
);
...
...
@@ -65,27 +67,27 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
}
}
static
int32_t
udfSpawnUdfd
(
SUdfdData
*
pData
)
{
static
int32_t
udfSpawnUdfd
(
SUdfdData
*
pData
)
{
fnInfo
(
"start to init udfd"
);
uv_process_options_t
options
=
{
0
};
char
path
[
PATH_MAX
]
=
{
0
};
if
(
tsProcPath
==
NULL
)
{
path
[
0
]
=
'.'
;
#ifdef WINDOWS
#ifdef WINDOWS
GetModuleFileName
(
NULL
,
path
,
PATH_MAX
);
taosDirName
(
path
);
#elif defined(_TD_DARWIN_64)
#elif defined(_TD_DARWIN_64)
uint32_t
pathSize
=
sizeof
(
path
);
_NSGetExecutablePath
(
path
,
&
pathSize
);
taosDirName
(
path
);
#endif
#endif
}
else
{
strncpy
(
path
,
tsProcPath
,
PATH_MAX
);
taosDirName
(
path
);
}
#ifdef WINDOWS
if
(
strlen
(
path
)
==
0
)
{
if
(
strlen
(
path
)
==
0
)
{
strcat
(
path
,
"udfd.exe"
);
}
else
{
strcat
(
path
,
"
\\
udfd.exe"
);
...
...
@@ -93,7 +95,7 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) {
#else
strcat
(
path
,
"/udfd"
);
#endif
char
*
argsUdfd
[]
=
{
path
,
"-c"
,
configDir
,
NULL
};
char
*
argsUdfd
[]
=
{
path
,
"-c"
,
configDir
,
NULL
};
options
.
args
=
argsUdfd
;
options
.
file
=
path
;
...
...
@@ -103,7 +105,7 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) {
uv_stdio_container_t
child_stdio
[
3
];
child_stdio
[
0
].
flags
=
UV_CREATE_PIPE
|
UV_READABLE_PIPE
;
child_stdio
[
0
].
data
.
stream
=
(
uv_stream_t
*
)
&
pData
->
ctrlPipe
;
child_stdio
[
0
].
data
.
stream
=
(
uv_stream_t
*
)
&
pData
->
ctrlPipe
;
child_stdio
[
1
].
flags
=
UV_IGNORE
;
child_stdio
[
2
].
flags
=
UV_INHERIT_FD
;
child_stdio
[
2
].
data
.
fd
=
2
;
...
...
@@ -117,12 +119,12 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) {
snprintf
(
dnodeIdEnvItem
,
32
,
"%s=%d"
,
"DNODE_ID"
,
pData
->
dnodeId
);
float
numCpuCores
=
4
;
taosGetCpuCores
(
&
numCpuCores
);
snprintf
(
thrdPoolSizeEnvItem
,
32
,
"%s=%d"
,
"UV_THREADPOOL_SIZE"
,
(
int
)
numCpuCores
*
2
);
char
*
envUdfd
[]
=
{
dnodeIdEnvItem
,
thrdPoolSizeEnvItem
,
NULL
};
snprintf
(
thrdPoolSizeEnvItem
,
32
,
"%s=%d"
,
"UV_THREADPOOL_SIZE"
,
(
int
)
numCpuCores
*
2
);
char
*
envUdfd
[]
=
{
dnodeIdEnvItem
,
thrdPoolSizeEnvItem
,
NULL
};
options
.
env
=
envUdfd
;
int
err
=
uv_spawn
(
&
pData
->
loop
,
&
pData
->
process
,
&
options
);
pData
->
process
.
data
=
(
void
*
)
pData
;
pData
->
process
.
data
=
(
void
*
)
pData
;
#ifdef WINDOWS
// End udfd.exe by Job.
...
...
@@ -135,7 +137,8 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) {
JOBOBJECT_EXTENDED_LIMIT_INFORMATION
limit_info
;
memset
(
&
limit_info
,
0x0
,
sizeof
(
limit_info
));
limit_info
.
BasicLimitInformation
.
LimitFlags
=
JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
;
bool
set_auto_kill_ok
=
SetInformationJobObject
(
pData
->
jobHandle
,
JobObjectExtendedLimitInformation
,
&
limit_info
,
sizeof
(
limit_info
));
bool
set_auto_kill_ok
=
SetInformationJobObject
(
pData
->
jobHandle
,
JobObjectExtendedLimitInformation
,
&
limit_info
,
sizeof
(
limit_info
));
if
(
!
set_auto_kill_ok
)
{
fnError
(
"Set job auto kill udfd failed."
);
}
...
...
@@ -150,7 +153,7 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) {
return
err
;
}
static
void
udfUdfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
static
void
udfUdfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
if
(
!
uv_is_closing
(
handle
))
{
uv_close
(
handle
,
NULL
);
}
...
...
@@ -180,8 +183,7 @@ static void udfWatchUdfd(void *args) {
int32_t
udfStartUdfd
(
int32_t
startDnodeId
)
{
if
(
!
tsStartUdfd
)
{
fnInfo
(
"start udfd is disabled."
)
return
0
;
fnInfo
(
"start udfd is disabled."
)
return
0
;
}
SUdfdData
*
pData
=
&
udfdGlobal
;
if
(
pData
->
startCalled
)
{
...
...
@@ -212,8 +214,7 @@ int32_t udfStartUdfd(int32_t startDnodeId) {
int32_t
udfStopUdfd
()
{
SUdfdData
*
pData
=
&
udfdGlobal
;
fnInfo
(
"udfd start to stop, need cleanup:%d, spawn err:%d"
,
pData
->
needCleanUp
,
pData
->
spawnErr
);
fnInfo
(
"udfd start to stop, need cleanup:%d, spawn err:%d"
,
pData
->
needCleanUp
,
pData
->
spawnErr
);
if
(
!
pData
->
needCleanUp
||
atomic_load_32
(
&
pData
->
stopCalled
))
{
return
0
;
}
...
...
@@ -237,33 +238,28 @@ int32_t udfStopUdfd() {
typedef
void
*
QUEUE
[
2
];
/* Private macros. */
#define QUEUE_NEXT(q)
(*(QUEUE **)
&((*(q))[0]))
#define QUEUE_PREV(q)
(*(QUEUE **)
&((*(q))[1]))
#define QUEUE_NEXT(q)
(*(QUEUE **)
&((*(q))[0]))
#define QUEUE_PREV(q)
(*(QUEUE **)
&((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Public macros. */
#define QUEUE_DATA(ptr, type, field) \
((type *) ((char *) (ptr) - offsetof(type, field)))
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr)-offsetof(type, field)))
/* Important note: mutating the list while QUEUE_FOREACH is
* iterating over its elements results in undefined behavior.
*/
#define QUEUE_FOREACH(q, h) \
for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
#define QUEUE_FOREACH(q, h) for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
#define QUEUE_EMPTY(q) \
((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))
#define QUEUE_EMPTY(q) ((const QUEUE *)(q) == (const QUEUE *)QUEUE_NEXT(q))
#define QUEUE_HEAD(q) \
(QUEUE_NEXT(q))
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
#define QUEUE_INIT(q) \
do { \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
} \
while (0)
} while (0)
#define QUEUE_ADD(h, n) \
do { \
...
...
@@ -271,8 +267,7 @@ typedef void *QUEUE[2];
QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV(h) = QUEUE_PREV(n); \
QUEUE_PREV_NEXT(h) = (h); \
} \
while (0)
} while (0)
#define QUEUE_SPLIT(h, q, n) \
do { \
...
...
@@ -282,19 +277,17 @@ typedef void *QUEUE[2];
QUEUE_PREV(h) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(h) = (h); \
QUEUE_PREV(q) = (n); \
} \
while (0)
} while (0)
#define QUEUE_MOVE(h, n) \
do { \
if (QUEUE_EMPTY(h)) \
QUEUE_INIT(n); \
else { \
QUEUE
* q = QUEUE_HEAD(h);
\
QUEUE
*q = QUEUE_HEAD(h);
\
QUEUE_SPLIT(h, q, n); \
} \
} \
while (0)
} while (0)
#define QUEUE_INSERT_HEAD(h, q) \
do { \
...
...
@@ -302,8 +295,7 @@ typedef void *QUEUE[2];
QUEUE_PREV(q) = (h); \
QUEUE_NEXT_PREV(q) = (q); \
QUEUE_NEXT(h) = (q); \
} \
while (0)
} while (0)
#define QUEUE_INSERT_TAIL(h, q) \
do { \
...
...
@@ -311,22 +303,15 @@ typedef void *QUEUE[2];
QUEUE_PREV(q) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(q) = (q); \
QUEUE_PREV(h) = (q); \
} \
while (0)
} while (0)
#define QUEUE_REMOVE(q) \
do { \
QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
} \
while (0)
} while (0)
enum
{
UV_TASK_CONNECT
=
0
,
UV_TASK_REQ_RSP
=
1
,
UV_TASK_DISCONNECT
=
2
};
enum
{
UV_TASK_CONNECT
=
0
,
UV_TASK_REQ_RSP
=
1
,
UV_TASK_DISCONNECT
=
2
};
int64_t
gUdfTaskSeqNum
=
0
;
typedef
struct
SUdfcFuncStub
{
...
...
@@ -352,7 +337,7 @@ typedef struct SUdfcProxy {
QUEUE
uvProcTaskQueue
;
uv_mutex_t
udfStubsMutex
;
SArray
*
udfStubs
;
// SUdfcFuncStub
SArray
*
udfStubs
;
// SUdfcFuncStub
int8_t
initialized
;
}
SUdfcProxy
;
...
...
@@ -434,27 +419,27 @@ enum {
UDFC_STATE_STOPPING
,
// stopping after udfcClose
};
int32_t
getUdfdPipeName
(
char
*
pipeName
,
int32_t
size
);
int32_t
getUdfdPipeName
(
char
*
pipeName
,
int32_t
size
);
int32_t
encodeUdfSetupRequest
(
void
**
buf
,
const
SUdfSetupRequest
*
setup
);
void
*
decodeUdfSetupRequest
(
const
void
*
buf
,
SUdfSetupRequest
*
request
);
int32_t
encodeUdfInterBuf
(
void
**
buf
,
const
SUdfInterBuf
*
state
);
void
*
decodeUdfInterBuf
(
const
void
*
buf
,
SUdfInterBuf
*
state
);
void
*
decodeUdfSetupRequest
(
const
void
*
buf
,
SUdfSetupRequest
*
request
);
int32_t
encodeUdfInterBuf
(
void
**
buf
,
const
SUdfInterBuf
*
state
);
void
*
decodeUdfInterBuf
(
const
void
*
buf
,
SUdfInterBuf
*
state
);
int32_t
encodeUdfCallRequest
(
void
**
buf
,
const
SUdfCallRequest
*
call
);
void
*
decodeUdfCallRequest
(
const
void
*
buf
,
SUdfCallRequest
*
call
);
void
*
decodeUdfCallRequest
(
const
void
*
buf
,
SUdfCallRequest
*
call
);
int32_t
encodeUdfTeardownRequest
(
void
**
buf
,
const
SUdfTeardownRequest
*
teardown
);
void
*
decodeUdfTeardownRequest
(
const
void
*
buf
,
SUdfTeardownRequest
*
teardown
);
int32_t
encodeUdfRequest
(
void
**
buf
,
const
SUdfRequest
*
request
);
void
*
decodeUdfRequest
(
const
void
*
buf
,
SUdfRequest
*
request
);
void
*
decodeUdfTeardownRequest
(
const
void
*
buf
,
SUdfTeardownRequest
*
teardown
);
int32_t
encodeUdfRequest
(
void
**
buf
,
const
SUdfRequest
*
request
);
void
*
decodeUdfRequest
(
const
void
*
buf
,
SUdfRequest
*
request
);
int32_t
encodeUdfSetupResponse
(
void
**
buf
,
const
SUdfSetupResponse
*
setupRsp
);
void
*
decodeUdfSetupResponse
(
const
void
*
buf
,
SUdfSetupResponse
*
setupRsp
);
void
*
decodeUdfSetupResponse
(
const
void
*
buf
,
SUdfSetupResponse
*
setupRsp
);
int32_t
encodeUdfCallResponse
(
void
**
buf
,
const
SUdfCallResponse
*
callRsp
);
void
*
decodeUdfCallResponse
(
const
void
*
buf
,
SUdfCallResponse
*
callRsp
);
int32_t
encodeUdfTeardownResponse
(
void
**
buf
,
const
SUdfTeardownResponse
*
teardownRsp
);
void
*
decodeUdfTeardownResponse
(
const
void
*
buf
,
SUdfTeardownResponse
*
teardownResponse
);
int32_t
encodeUdfResponse
(
void
**
buf
,
const
SUdfResponse
*
rsp
);
void
*
decodeUdfResponse
(
const
void
*
buf
,
SUdfResponse
*
rsp
);
void
*
decodeUdfCallResponse
(
const
void
*
buf
,
SUdfCallResponse
*
callRsp
);
int32_t
encodeUdfTeardownResponse
(
void
**
buf
,
const
SUdfTeardownResponse
*
teardownRsp
);
void
*
decodeUdfTeardownResponse
(
const
void
*
buf
,
SUdfTeardownResponse
*
teardownResponse
);
int32_t
encodeUdfResponse
(
void
**
buf
,
const
SUdfResponse
*
rsp
);
void
*
decodeUdfResponse
(
const
void
*
buf
,
SUdfResponse
*
rsp
);
void
freeUdfColumnData
(
SUdfColumnData
*
data
,
SUdfColumnMeta
*
meta
);
void
freeUdfColumn
(
SUdfColumn
*
col
);
void
freeUdfColumn
(
SUdfColumn
*
col
);
void
freeUdfDataDataBlock
(
SUdfDataBlock
*
block
);
void
freeUdfInterBuf
(
SUdfInterBuf
*
buf
);
int32_t
convertDataBlockToUdfDataBlock
(
SSDataBlock
*
block
,
SUdfDataBlock
*
udfBlock
);
...
...
@@ -462,7 +447,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
int32_t
convertScalarParamToDataBlock
(
SScalarParam
*
input
,
int32_t
numOfCols
,
SSDataBlock
*
output
);
int32_t
convertDataBlockToScalarParm
(
SSDataBlock
*
input
,
SScalarParam
*
output
);
int32_t
getUdfdPipeName
(
char
*
pipeName
,
int32_t
size
)
{
int32_t
getUdfdPipeName
(
char
*
pipeName
,
int32_t
size
)
{
char
dnodeId
[
8
]
=
{
0
};
size_t
dnodeIdSize
=
sizeof
(
dnodeId
);
int32_t
err
=
uv_os_getenv
(
UDF_DNODE_ID_ENV_NAME
,
dnodeId
,
&
dnodeIdSize
);
...
...
@@ -471,7 +456,8 @@ int32_t getUdfdPipeName(char* pipeName, int32_t size) {
dnodeId
[
0
]
=
'1'
;
}
#ifdef _WIN32
snprintf
(
pipeName
,
size
,
"%s.%x.%s"
,
UDF_LISTEN_PIPE_NAME_PREFIX
,
MurmurHash3_32
(
tsDataDir
,
strlen
(
tsDataDir
)),
dnodeId
);
snprintf
(
pipeName
,
size
,
"%s.%x.%s"
,
UDF_LISTEN_PIPE_NAME_PREFIX
,
MurmurHash3_32
(
tsDataDir
,
strlen
(
tsDataDir
)),
dnodeId
);
#else
snprintf
(
pipeName
,
size
,
"%s/%s%s"
,
tsDataDir
,
UDF_LISTEN_PIPE_NAME_PREFIX
,
dnodeId
);
#endif
...
...
@@ -485,12 +471,12 @@ int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
return
len
;
}
void
*
decodeUdfSetupRequest
(
const
void
*
buf
,
SUdfSetupRequest
*
request
)
{
void
*
decodeUdfSetupRequest
(
const
void
*
buf
,
SUdfSetupRequest
*
request
)
{
buf
=
taosDecodeBinaryTo
(
buf
,
request
->
udfName
,
TSDB_FUNC_NAME_LEN
);
return
(
void
*
)
buf
;
return
(
void
*
)
buf
;
}
int32_t
encodeUdfInterBuf
(
void
**
buf
,
const
SUdfInterBuf
*
state
)
{
int32_t
encodeUdfInterBuf
(
void
**
buf
,
const
SUdfInterBuf
*
state
)
{
int32_t
len
=
0
;
len
+=
taosEncodeFixedI8
(
buf
,
state
->
numOfResult
);
len
+=
taosEncodeFixedI32
(
buf
,
state
->
bufLen
);
...
...
@@ -498,11 +484,11 @@ int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
return
len
;
}
void
*
decodeUdfInterBuf
(
const
void
*
buf
,
SUdfInterBuf
*
state
)
{
void
*
decodeUdfInterBuf
(
const
void
*
buf
,
SUdfInterBuf
*
state
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
state
->
numOfResult
);
buf
=
taosDecodeFixedI32
(
buf
,
&
state
->
bufLen
);
buf
=
taosDecodeBinary
(
buf
,
(
void
**
)
&
state
->
buf
,
state
->
bufLen
);
return
(
void
*
)
buf
;
buf
=
taosDecodeBinary
(
buf
,
(
void
**
)
&
state
->
buf
,
state
->
bufLen
);
return
(
void
*
)
buf
;
}
int32_t
encodeUdfCallRequest
(
void
**
buf
,
const
SUdfCallRequest
*
call
)
{
...
...
@@ -525,7 +511,7 @@ int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
return
len
;
}
void
*
decodeUdfCallRequest
(
const
void
*
buf
,
SUdfCallRequest
*
call
)
{
void
*
decodeUdfCallRequest
(
const
void
*
buf
,
SUdfCallRequest
*
call
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
call
->
udfHandle
);
buf
=
taosDecodeFixedI8
(
buf
,
&
call
->
callType
);
switch
(
call
->
callType
)
{
...
...
@@ -547,7 +533,7 @@ void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) {
buf
=
decodeUdfInterBuf
(
buf
,
&
call
->
interBuf
);
break
;
}
return
(
void
*
)
buf
;
return
(
void
*
)
buf
;
}
int32_t
encodeUdfTeardownRequest
(
void
**
buf
,
const
SUdfTeardownRequest
*
teardown
)
{
...
...
@@ -556,17 +542,17 @@ int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown
return
len
;
}
void
*
decodeUdfTeardownRequest
(
const
void
*
buf
,
SUdfTeardownRequest
*
teardown
)
{
void
*
decodeUdfTeardownRequest
(
const
void
*
buf
,
SUdfTeardownRequest
*
teardown
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
teardown
->
udfHandle
);
return
(
void
*
)
buf
;
return
(
void
*
)
buf
;
}
int32_t
encodeUdfRequest
(
void
**
buf
,
const
SUdfRequest
*
request
)
{
int32_t
encodeUdfRequest
(
void
**
buf
,
const
SUdfRequest
*
request
)
{
int32_t
len
=
0
;
if
(
buf
==
NULL
)
{
len
+=
sizeof
(
request
->
msgLen
);
}
else
{
*
(
int32_t
*
)(
*
buf
)
=
request
->
msgLen
;
*
(
int32_t
*
)(
*
buf
)
=
request
->
msgLen
;
*
buf
=
POINTER_SHIFT
(
*
buf
,
sizeof
(
request
->
msgLen
));
}
len
+=
taosEncodeFixedI64
(
buf
,
request
->
seqNum
);
...
...
@@ -581,8 +567,8 @@ int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) {
return
len
;
}
void
*
decodeUdfRequest
(
const
void
*
buf
,
SUdfRequest
*
request
)
{
request
->
msgLen
=
*
(
int32_t
*
)(
buf
);
void
*
decodeUdfRequest
(
const
void
*
buf
,
SUdfRequest
*
request
)
{
request
->
msgLen
=
*
(
int32_t
*
)(
buf
);
buf
=
POINTER_SHIFT
(
buf
,
sizeof
(
request
->
msgLen
));
buf
=
taosDecodeFixedI64
(
buf
,
&
request
->
seqNum
);
...
...
@@ -595,7 +581,7 @@ void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
}
else
if
(
request
->
type
==
UDF_TASK_TEARDOWN
)
{
buf
=
decodeUdfTeardownRequest
(
buf
,
&
request
->
teardown
);
}
return
(
void
*
)
buf
;
return
(
void
*
)
buf
;
}
int32_t
encodeUdfSetupResponse
(
void
**
buf
,
const
SUdfSetupResponse
*
setupRsp
)
{
...
...
@@ -607,12 +593,12 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
return
len
;
}
void
*
decodeUdfSetupResponse
(
const
void
*
buf
,
SUdfSetupResponse
*
setupRsp
)
{
void
*
decodeUdfSetupResponse
(
const
void
*
buf
,
SUdfSetupResponse
*
setupRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
setupRsp
->
udfHandle
);
buf
=
taosDecodeFixedI8
(
buf
,
&
setupRsp
->
outputType
);
buf
=
taosDecodeFixedI32
(
buf
,
&
setupRsp
->
outputLen
);
buf
=
taosDecodeFixedI32
(
buf
,
&
setupRsp
->
bufSize
);
return
(
void
*
)
buf
;
return
(
void
*
)
buf
;
}
int32_t
encodeUdfCallResponse
(
void
**
buf
,
const
SUdfCallResponse
*
callRsp
)
{
...
...
@@ -638,7 +624,7 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
return
len
;
}
void
*
decodeUdfCallResponse
(
const
void
*
buf
,
SUdfCallResponse
*
callRsp
)
{
void
*
decodeUdfCallResponse
(
const
void
*
buf
,
SUdfCallResponse
*
callRsp
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
callRsp
->
callType
);
switch
(
callRsp
->
callType
)
{
case
TSDB_UDF_CALL_SCALA_PROC
:
...
...
@@ -657,30 +643,26 @@ void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) {
buf
=
decodeUdfInterBuf
(
buf
,
&
callRsp
->
resultBuf
);
break
;
}
return
(
void
*
)
buf
;
return
(
void
*
)
buf
;
}
int32_t
encodeUdfTeardownResponse
(
void
**
buf
,
const
SUdfTeardownResponse
*
teardownRsp
)
{
return
0
;
}
int32_t
encodeUdfTeardownResponse
(
void
**
buf
,
const
SUdfTeardownResponse
*
teardownRsp
)
{
return
0
;
}
void
*
decodeUdfTeardownResponse
(
const
void
*
buf
,
SUdfTeardownResponse
*
teardownResponse
)
{
return
(
void
*
)
buf
;
}
void
*
decodeUdfTeardownResponse
(
const
void
*
buf
,
SUdfTeardownResponse
*
teardownResponse
)
{
return
(
void
*
)
buf
;
}
int32_t
encodeUdfResponse
(
void
**
buf
,
const
SUdfResponse
*
rsp
)
{
int32_t
encodeUdfResponse
(
void
**
buf
,
const
SUdfResponse
*
rsp
)
{
int32_t
len
=
0
;
if
(
buf
==
NULL
)
{
len
+=
sizeof
(
rsp
->
msgLen
);
}
else
{
*
(
int32_t
*
)(
*
buf
)
=
rsp
->
msgLen
;
*
(
int32_t
*
)(
*
buf
)
=
rsp
->
msgLen
;
*
buf
=
POINTER_SHIFT
(
*
buf
,
sizeof
(
rsp
->
msgLen
));
}
if
(
buf
==
NULL
)
{
len
+=
sizeof
(
rsp
->
seqNum
);
}
else
{
*
(
int64_t
*
)(
*
buf
)
=
rsp
->
seqNum
;
*
(
int64_t
*
)(
*
buf
)
=
rsp
->
seqNum
;
*
buf
=
POINTER_SHIFT
(
*
buf
,
sizeof
(
rsp
->
seqNum
));
}
...
...
@@ -705,10 +687,10 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
return
len
;
}
void
*
decodeUdfResponse
(
const
void
*
buf
,
SUdfResponse
*
rsp
)
{
rsp
->
msgLen
=
*
(
int32_t
*
)(
buf
);
void
*
decodeUdfResponse
(
const
void
*
buf
,
SUdfResponse
*
rsp
)
{
rsp
->
msgLen
=
*
(
int32_t
*
)(
buf
);
buf
=
POINTER_SHIFT
(
buf
,
sizeof
(
rsp
->
msgLen
));
rsp
->
seqNum
=
*
(
int64_t
*
)(
buf
);
rsp
->
seqNum
=
*
(
int64_t
*
)(
buf
);
buf
=
POINTER_SHIFT
(
buf
,
sizeof
(
rsp
->
seqNum
));
buf
=
taosDecodeFixedI64
(
buf
,
&
rsp
->
seqNum
);
buf
=
taosDecodeFixedI8
(
buf
,
&
rsp
->
type
);
...
...
@@ -728,7 +710,7 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
fnError
(
"decode udf response, invalid udf response type %d"
,
rsp
->
type
);
break
;
}
return
(
void
*
)
buf
;
return
(
void
*
)
buf
;
}
void
freeUdfColumnData
(
SUdfColumnData
*
data
,
SUdfColumnMeta
*
meta
)
{
...
...
@@ -745,9 +727,7 @@ void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
}
}
void
freeUdfColumn
(
SUdfColumn
*
col
)
{
freeUdfColumnData
(
&
col
->
colData
,
&
col
->
colMeta
);
}
void
freeUdfColumn
(
SUdfColumn
*
col
)
{
freeUdfColumnData
(
&
col
->
colData
,
&
col
->
colMeta
);
}
void
freeUdfDataDataBlock
(
SUdfDataBlock
*
block
)
{
for
(
int32_t
i
=
0
;
i
<
block
->
numOfCols
;
++
i
)
{
...
...
@@ -764,14 +744,13 @@ void freeUdfInterBuf(SUdfInterBuf *buf) {
buf
->
buf
=
NULL
;
}
int32_t
convertDataBlockToUdfDataBlock
(
SSDataBlock
*
block
,
SUdfDataBlock
*
udfBlock
)
{
udfBlock
->
numOfRows
=
block
->
info
.
rows
;
udfBlock
->
numOfCols
=
taosArrayGetSize
(
block
->
pDataBlock
);
udfBlock
->
udfCols
=
taosMemoryCalloc
(
taosArrayGetSize
(
block
->
pDataBlock
),
sizeof
(
SUdfColumn
*
));
udfBlock
->
udfCols
=
taosMemoryCalloc
(
taosArrayGetSize
(
block
->
pDataBlock
),
sizeof
(
SUdfColumn
*
));
for
(
int32_t
i
=
0
;
i
<
udfBlock
->
numOfCols
;
++
i
)
{
udfBlock
->
udfCols
[
i
]
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfColumn
));
SColumnInfoData
*
col
=
(
SColumnInfoData
*
)
taosArrayGet
(
block
->
pDataBlock
,
i
);
SColumnInfoData
*
col
=
(
SColumnInfoData
*
)
taosArrayGet
(
block
->
pDataBlock
,
i
);
SUdfColumn
*
udfCol
=
udfBlock
->
udfCols
[
i
];
udfCol
->
colMeta
.
type
=
col
->
info
.
type
;
udfCol
->
colMeta
.
bytes
=
col
->
info
.
bytes
;
...
...
@@ -790,12 +769,12 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol
->
colData
.
fixLenCol
.
nullBitmapLen
=
BitmapLen
(
udfCol
->
colData
.
numOfRows
);
int32_t
bitmapLen
=
udfCol
->
colData
.
fixLenCol
.
nullBitmapLen
;
udfCol
->
colData
.
fixLenCol
.
nullBitmap
=
taosMemoryMalloc
(
udfCol
->
colData
.
fixLenCol
.
nullBitmapLen
);
char
*
bitmap
=
udfCol
->
colData
.
fixLenCol
.
nullBitmap
;
char
*
bitmap
=
udfCol
->
colData
.
fixLenCol
.
nullBitmap
;
memcpy
(
bitmap
,
col
->
nullbitmap
,
bitmapLen
);
udfCol
->
colData
.
fixLenCol
.
dataLen
=
colDataGetLength
(
col
,
udfBlock
->
numOfRows
);
int32_t
dataLen
=
udfCol
->
colData
.
fixLenCol
.
dataLen
;
udfCol
->
colData
.
fixLenCol
.
data
=
taosMemoryMalloc
(
udfCol
->
colData
.
fixLenCol
.
dataLen
);
char
*
data
=
udfCol
->
colData
.
fixLenCol
.
data
;
char
*
data
=
udfCol
->
colData
.
fixLenCol
.
data
;
memcpy
(
data
,
col
->
pData
,
dataLen
);
}
}
...
...
@@ -837,7 +816,7 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
taosArrayPush
(
output
->
pDataBlock
,
(
input
+
i
)
->
columnData
);
if
(
IS_VAR_DATA_TYPE
((
input
+
i
)
->
columnData
->
info
.
type
))
{
if
(
IS_VAR_DATA_TYPE
((
input
+
i
)
->
columnData
->
info
.
type
))
{
output
->
info
.
hasVarCol
=
true
;
}
}
...
...
@@ -852,21 +831,19 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
output
->
numOfRows
=
input
->
info
.
rows
;
output
->
columnData
=
taosMemoryMalloc
(
sizeof
(
SColumnInfoData
));
memcpy
(
output
->
columnData
,
taosArrayGet
(
input
->
pDataBlock
,
0
),
sizeof
(
SColumnInfoData
));
memcpy
(
output
->
columnData
,
taosArrayGet
(
input
->
pDataBlock
,
0
),
sizeof
(
SColumnInfoData
));
output
->
colAlloced
=
true
;
return
0
;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
//
memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef
struct
SUdfAggRes
{
int8_t
finalResNum
;
int8_t
interResNum
;
char
*
finalResBuf
;
char
*
interResBuf
;
char
*
finalResBuf
;
char
*
interResBuf
;
}
SUdfAggRes
;
void
onUdfcPipeClose
(
uv_handle_t
*
handle
);
int32_t
udfcGetUdfTaskResultFromUvTask
(
SClientUdfTask
*
task
,
SClientUvTaskNode
*
uvTask
);
...
...
@@ -886,37 +863,38 @@ void udfStopAsyncCb(uv_async_t *async);
void
constructUdfService
(
void
*
argsThread
);
int32_t
udfcRunUdfUvTask
(
SClientUdfTask
*
task
,
int8_t
uvTaskType
);
int32_t
doSetupUdf
(
char
udfName
[],
UdfcFuncHandle
*
funcHandle
);
int
compareUdfcFuncSub
(
const
void
*
elem1
,
const
void
*
elem2
);
int
compareUdfcFuncSub
(
const
void
*
elem1
,
const
void
*
elem2
);
int32_t
doTeardownUdf
(
UdfcFuncHandle
handle
);
int32_t
callUdf
(
UdfcFuncHandle
handle
,
int8_t
callType
,
SSDataBlock
*
input
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
state2
,
SSDataBlock
*
output
,
SUdfInterBuf
*
newState
);
SSDataBlock
*
output
,
SUdfInterBuf
*
newState
);
int32_t
doCallUdfAggInit
(
UdfcFuncHandle
handle
,
SUdfInterBuf
*
interBuf
);
int32_t
doCallUdfAggProcess
(
UdfcFuncHandle
handle
,
SSDataBlock
*
block
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
newState
);
int32_t
doCallUdfAggMerge
(
UdfcFuncHandle
handle
,
SUdfInterBuf
*
interBuf1
,
SUdfInterBuf
*
interBuf2
,
SUdfInterBuf
*
resultBuf
);
int32_t
doCallUdfAggMerge
(
UdfcFuncHandle
handle
,
SUdfInterBuf
*
interBuf1
,
SUdfInterBuf
*
interBuf2
,
SUdfInterBuf
*
resultBuf
);
int32_t
doCallUdfAggFinalize
(
UdfcFuncHandle
handle
,
SUdfInterBuf
*
interBuf
,
SUdfInterBuf
*
resultData
);
int32_t
doCallUdfScalarFunc
(
UdfcFuncHandle
handle
,
SScalarParam
*
input
,
int32_t
numOfCols
,
SScalarParam
*
output
);
int32_t
doCallUdfScalarFunc
(
UdfcFuncHandle
handle
,
SScalarParam
*
input
,
int32_t
numOfCols
,
SScalarParam
*
output
);
int32_t
callUdfScalarFunc
(
char
*
udfName
,
SScalarParam
*
input
,
int32_t
numOfCols
,
SScalarParam
*
output
);
int32_t
udfcOpen
();
int32_t
udfcClose
();
int32_t
acquireUdfFuncHandle
(
char
*
udfName
,
UdfcFuncHandle
*
pHandle
);
void
releaseUdfFuncHandle
(
char
*
udfName
);
int32_t
acquireUdfFuncHandle
(
char
*
udfName
,
UdfcFuncHandle
*
pHandle
);
void
releaseUdfFuncHandle
(
char
*
udfName
);
int32_t
cleanUpUdfs
();
bool
udfAggGetEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
udfAggInit
(
struct
SqlFunctionCtx
*
pCtx
,
struct
SResultRowEntryInfo
*
pResultCellInfo
);
bool
udfAggGetEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
udfAggInit
(
struct
SqlFunctionCtx
*
pCtx
,
struct
SResultRowEntryInfo
*
pResultCellInfo
);
int32_t
udfAggProcess
(
struct
SqlFunctionCtx
*
pCtx
);
int32_t
udfAggFinalize
(
struct
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
udfAggFinalize
(
struct
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int
compareUdfcFuncSub
(
const
void
*
elem1
,
const
void
*
elem2
)
{
int
compareUdfcFuncSub
(
const
void
*
elem1
,
const
void
*
elem2
)
{
SUdfcFuncStub
*
stub1
=
(
SUdfcFuncStub
*
)
elem1
;
SUdfcFuncStub
*
stub2
=
(
SUdfcFuncStub
*
)
elem2
;
return
strcmp
(
stub1
->
udfName
,
stub2
->
udfName
);
}
int32_t
acquireUdfFuncHandle
(
char
*
udfName
,
UdfcFuncHandle
*
pHandle
)
{
int32_t
acquireUdfFuncHandle
(
char
*
udfName
,
UdfcFuncHandle
*
pHandle
)
{
int32_t
code
=
0
;
uv_mutex_lock
(
&
gUdfdProxy
.
udfStubsMutex
);
SUdfcFuncStub
key
=
{
0
};
...
...
@@ -925,15 +903,15 @@ int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
if
(
stubIndex
!=
-
1
)
{
SUdfcFuncStub
*
foundStub
=
taosArrayGet
(
gUdfdProxy
.
udfStubs
,
stubIndex
);
UdfcFuncHandle
handle
=
foundStub
->
handle
;
if
(
handle
!=
NULL
&&
((
SUdfcUvSession
*
)
handle
)
->
udfUvPipe
!=
NULL
)
{
if
(
handle
!=
NULL
&&
((
SUdfcUvSession
*
)
handle
)
->
udfUvPipe
!=
NULL
)
{
*
pHandle
=
foundStub
->
handle
;
++
foundStub
->
refCount
;
foundStub
->
lastRefTime
=
taosGetTimestampUs
();
uv_mutex_unlock
(
&
gUdfdProxy
.
udfStubsMutex
);
return
0
;
}
else
{
fnInfo
(
"invalid handle for %s, refCount: %d, last ref time: %"
PRId64
". remove it from cache"
,
udfName
,
foundStub
->
refCount
,
foundStub
->
lastRefTime
);
fnInfo
(
"invalid handle for %s, refCount: %d, last ref time: %"
PRId64
". remove it from cache"
,
udfName
,
foundStub
->
refCount
,
foundStub
->
lastRefTime
);
taosArrayRemove
(
gUdfdProxy
.
udfStubs
,
stubIndex
);
}
}
...
...
@@ -955,7 +933,7 @@ int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
return
code
;
}
void
releaseUdfFuncHandle
(
char
*
udfName
)
{
void
releaseUdfFuncHandle
(
char
*
udfName
)
{
uv_mutex_lock
(
&
gUdfdProxy
.
udfStubsMutex
);
SUdfcFuncStub
key
=
{
0
};
strcpy
(
key
.
udfName
,
udfName
);
...
...
@@ -981,7 +959,7 @@ int32_t cleanUpUdfs() {
uv_mutex_unlock
(
&
gUdfdProxy
.
udfStubsMutex
);
return
TSDB_CODE_SUCCESS
;
}
SArray
*
udfStubs
=
taosArrayInit
(
16
,
sizeof
(
SUdfcFuncStub
));
SArray
*
udfStubs
=
taosArrayInit
(
16
,
sizeof
(
SUdfcFuncStub
));
int32_t
i
=
0
;
while
(
i
<
taosArrayGetSize
(
gUdfdProxy
.
udfStubs
))
{
SUdfcFuncStub
*
stub
=
taosArrayGet
(
gUdfdProxy
.
udfStubs
,
i
);
...
...
@@ -989,13 +967,13 @@ int32_t cleanUpUdfs() {
fnInfo
(
"tear down udf. udf name: %s, handle: %p, ref count: %d"
,
stub
->
udfName
,
stub
->
handle
,
stub
->
refCount
);
doTeardownUdf
(
stub
->
handle
);
}
else
{
fnInfo
(
"udf still in use. udf name: %s, ref count: %d, last ref time: %"
PRId64
", handle: %p"
,
stub
->
udfName
,
stub
->
refCount
,
stub
->
lastRefTime
,
stub
->
handle
);
fnInfo
(
"udf still in use. udf name: %s, ref count: %d, last ref time: %"
PRId64
", handle: %p"
,
stub
->
udfName
,
stub
->
refCount
,
stub
->
lastRefTime
,
stub
->
handle
);
UdfcFuncHandle
handle
=
stub
->
handle
;
if
(
handle
!=
NULL
&&
((
SUdfcUvSession
*
)
handle
)
->
udfUvPipe
!=
NULL
)
{
if
(
handle
!=
NULL
&&
((
SUdfcUvSession
*
)
handle
)
->
udfUvPipe
!=
NULL
)
{
taosArrayPush
(
udfStubs
,
stub
);
}
else
{
fnInfo
(
"udf invalid handle for %s, refCount: %d, last ref time: %"
PRId64
". remove it from cache"
,
fnInfo
(
"udf invalid handle for %s, refCount: %d, last ref time: %"
PRId64
". remove it from cache"
,
stub
->
udfName
,
stub
->
refCount
,
stub
->
lastRefTime
);
}
}
...
...
@@ -1020,8 +998,8 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
code
=
TSDB_CODE_UDF_INVALID_OUTPUT_TYPE
;
}
else
{
if
(
session
->
outputType
!=
output
->
columnData
->
info
.
type
||
session
->
outputLen
!=
output
->
columnData
->
info
.
bytes
)
{
fnError
(
"udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)"
,
session
->
outputType
,
session
->
outputLen
,
output
->
columnData
->
info
.
type
,
output
->
columnData
->
info
.
bytes
);
fnError
(
"udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)"
,
session
->
output
Type
,
session
->
output
Len
,
output
->
columnData
->
info
.
type
,
output
->
columnData
->
info
.
bytes
);
code
=
TSDB_CODE_UDF_INVALID_OUTPUT_TYPE
;
}
}
...
...
@@ -1029,7 +1007,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
return
code
;
}
bool
udfAggGetEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
bool
udfAggGetEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
if
(
fmIsScalarFunc
(
pFunc
->
funcId
))
{
return
false
;
}
...
...
@@ -1037,7 +1015,7 @@ bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return
true
;
}
bool
udfAggInit
(
struct
SqlFunctionCtx
*
pCtx
,
struct
SResultRowEntryInfo
*
pResultCellInfo
)
{
bool
udfAggInit
(
struct
SqlFunctionCtx
*
pCtx
,
struct
SResultRowEntryInfo
*
pResultCellInfo
)
{
if
(
functionSetup
(
pCtx
,
pResultCellInfo
)
!=
true
)
{
return
false
;
}
...
...
@@ -1048,12 +1026,12 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
return
false
;
}
SUdfcUvSession
*
session
=
(
SUdfcUvSession
*
)
handle
;
SUdfAggRes
*
udfRes
=
(
SUdfAggRes
*
)
GET_ROWCELL_INTERBUF
(
pResultCellInfo
);
SUdfAggRes
*
udfRes
=
(
SUdfAggRes
*
)
GET_ROWCELL_INTERBUF
(
pResultCellInfo
);
int32_t
envSize
=
sizeof
(
SUdfAggRes
)
+
session
->
outputLen
+
session
->
bufSize
;
memset
(
udfRes
,
0
,
envSize
);
udfRes
->
finalResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
);
udfRes
->
interResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
)
+
session
->
outputLen
;
udfRes
->
finalResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
);
udfRes
->
interResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
)
+
session
->
outputLen
;
SUdfInterBuf
buf
=
{
0
};
if
((
udfCode
=
doCallUdfAggInit
(
handle
,
&
buf
))
!=
0
)
{
...
...
@@ -1083,16 +1061,16 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
}
SUdfcUvSession
*
session
=
handle
;
SUdfAggRes
*
udfRes
=
(
SUdfAggRes
*
)
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
udfRes
->
finalResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
);
udfRes
->
interResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
)
+
session
->
outputLen
;
SUdfAggRes
*
udfRes
=
(
SUdfAggRes
*
)
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
udfRes
->
finalResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
);
udfRes
->
interResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
)
+
session
->
outputLen
;
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
int32_t
numOfCols
=
pInput
->
numOfInputCols
;
int32_t
start
=
pInput
->
startRowIndex
;
int32_t
numOfRows
=
pInput
->
numOfRows
;
SSDataBlock
*
pTempBlock
=
createDataBlock
();
SSDataBlock
*
pTempBlock
=
createDataBlock
();
pTempBlock
->
info
.
rows
=
pInput
->
totalRows
;
pTempBlock
->
info
.
uid
=
pInput
->
uid
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
...
...
@@ -1101,9 +1079,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SSDataBlock
*
inputBlock
=
blockDataExtractBlock
(
pTempBlock
,
start
,
numOfRows
);
SUdfInterBuf
state
=
{.
buf
=
udfRes
->
interResBuf
,
.
bufLen
=
session
->
bufSize
,
.
numOfResult
=
udfRes
->
interResNum
};
SUdfInterBuf
state
=
{.
buf
=
udfRes
->
interResBuf
,
.
bufLen
=
session
->
bufSize
,
.
numOfResult
=
udfRes
->
interResNum
};
SUdfInterBuf
newState
=
{
0
};
udfCode
=
doCallUdfAggProcess
(
session
,
inputBlock
,
&
state
,
&
newState
);
...
...
@@ -1133,7 +1109,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
return
udfCode
;
}
int32_t
udfAggFinalize
(
struct
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
int32_t
udfAggFinalize
(
struct
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
int32_t
udfCode
=
0
;
UdfcFuncHandle
handle
=
0
;
if
((
udfCode
=
acquireUdfFuncHandle
((
char
*
)
pCtx
->
udfName
,
&
handle
))
!=
0
)
{
...
...
@@ -1142,17 +1118,14 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
}
SUdfcUvSession
*
session
=
handle
;
SUdfAggRes
*
udfRes
=
(
SUdfAggRes
*
)
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
udfRes
->
finalResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
);
udfRes
->
interResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
)
+
session
->
outputLen
;
SUdfAggRes
*
udfRes
=
(
SUdfAggRes
*
)
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
udfRes
->
finalResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
);
udfRes
->
interResBuf
=
(
char
*
)
udfRes
+
sizeof
(
SUdfAggRes
)
+
session
->
outputLen
;
SUdfInterBuf
resultBuf
=
{
0
};
SUdfInterBuf
state
=
{.
buf
=
udfRes
->
interResBuf
,
.
bufLen
=
session
->
bufSize
,
.
numOfResult
=
udfRes
->
interResNum
};
int32_t
udfCallCode
=
0
;
udfCallCode
=
doCallUdfAggFinalize
(
session
,
&
state
,
&
resultBuf
);
SUdfInterBuf
state
=
{.
buf
=
udfRes
->
interResBuf
,
.
bufLen
=
session
->
bufSize
,
.
numOfResult
=
udfRes
->
interResNum
};
int32_t
udfCallCode
=
0
;
udfCallCode
=
doCallUdfAggFinalize
(
session
,
&
state
,
&
resultBuf
);
if
(
udfCallCode
!=
0
)
{
fnError
(
"udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d"
,
udfCallCode
);
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
0
;
...
...
@@ -1178,7 +1151,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
void
onUdfcPipeClose
(
uv_handle_t
*
handle
)
{
SClientUvConn
*
conn
=
handle
->
data
;
if
(
!
QUEUE_EMPTY
(
&
conn
->
taskQueue
))
{
QUEUE
*
h
=
QUEUE_HEAD
(
&
conn
->
taskQueue
);
QUEUE
*
h
=
QUEUE_HEAD
(
&
conn
->
taskQueue
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
connTaskQueue
);
task
->
errCode
=
0
;
QUEUE_REMOVE
(
&
task
->
procTaskQueue
);
...
...
@@ -1189,7 +1162,7 @@ void onUdfcPipeClose(uv_handle_t *handle) {
}
taosMemoryFree
(
conn
->
readBuf
.
buf
);
taosMemoryFree
(
conn
);
taosMemoryFree
((
uv_pipe_t
*
)
handle
);
taosMemoryFree
((
uv_pipe_t
*
)
handle
);
}
int32_t
udfcGetUdfTaskResultFromUvTask
(
SClientUdfTask
*
task
,
SClientUvTaskNode
*
uvTask
)
{
...
...
@@ -1197,7 +1170,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *
if
(
uvTask
->
type
==
UV_TASK_REQ_RSP
)
{
if
(
uvTask
->
rspBuf
.
base
!=
NULL
)
{
SUdfResponse
rsp
=
{
0
};
void
*
buf
=
decodeUdfResponse
(
uvTask
->
rspBuf
.
base
,
&
rsp
);
void
*
buf
=
decodeUdfResponse
(
uvTask
->
rspBuf
.
base
,
&
rsp
);
assert
(
uvTask
->
rspBuf
.
len
==
POINTER_DISTANCE
(
buf
,
uvTask
->
rspBuf
.
base
));
task
->
errCode
=
rsp
.
code
;
...
...
@@ -1273,7 +1246,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
bool
isUdfcUvMsgComplete
(
SClientConnBuf
*
connBuf
)
{
if
(
connBuf
->
total
==
-
1
&&
connBuf
->
len
>=
sizeof
(
int32_t
))
{
connBuf
->
total
=
*
(
int32_t
*
)
(
connBuf
->
buf
);
connBuf
->
total
=
*
(
int32_t
*
)(
connBuf
->
buf
);
}
if
(
connBuf
->
len
==
connBuf
->
cap
&&
connBuf
->
total
==
connBuf
->
cap
)
{
fnDebug
(
"udfc complete message is received, now handle it"
);
...
...
@@ -1284,15 +1257,15 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
void
udfcUvHandleRsp
(
SClientUvConn
*
conn
)
{
SClientConnBuf
*
connBuf
=
&
conn
->
readBuf
;
int64_t
seqNum
=
*
(
int64_t
*
)
(
connBuf
->
buf
+
sizeof
(
int32_t
));
// msglen then seqnum
int64_t
seqNum
=
*
(
int64_t
*
)(
connBuf
->
buf
+
sizeof
(
int32_t
));
// msglen then seqnum
if
(
QUEUE_EMPTY
(
&
conn
->
taskQueue
))
{
fnError
(
"udfc no task waiting on connection. response seqnum:%"
PRId64
,
seqNum
);
fnError
(
"udfc no task waiting on connection. response seqnum:%"
PRId64
,
seqNum
);
return
;
}
bool
found
=
false
;
SClientUvTaskNode
*
taskFound
=
NULL
;
QUEUE
*
h
=
QUEUE_NEXT
(
&
conn
->
taskQueue
);
QUEUE
*
h
=
QUEUE_NEXT
(
&
conn
->
taskQueue
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
connTaskQueue
);
while
(
h
!=
&
conn
->
taskQueue
)
{
...
...
@@ -1327,7 +1300,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
void
udfcUvHandleError
(
SClientUvConn
*
conn
)
{
fnDebug
(
"handle error on conn: %p, pipe: %p"
,
conn
,
conn
->
pipe
);
while
(
!
QUEUE_EMPTY
(
&
conn
->
taskQueue
))
{
QUEUE
*
h
=
QUEUE_HEAD
(
&
conn
->
taskQueue
);
QUEUE
*
h
=
QUEUE_HEAD
(
&
conn
->
taskQueue
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
connTaskQueue
);
task
->
errCode
=
TSDB_CODE_UDF_PIPE_READ_ERR
;
QUEUE_REMOVE
(
&
task
->
connTaskQueue
);
...
...
@@ -1335,7 +1308,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
uv_sem_post
(
&
task
->
taskSem
);
}
uv_close
((
uv_handle_t
*
)
conn
->
pipe
,
onUdfcPipeClose
);
uv_close
((
uv_handle_t
*
)
conn
->
pipe
,
onUdfcPipeClose
);
}
void
onUdfcPipeRead
(
uv_stream_t
*
client
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
...
...
@@ -1373,7 +1346,7 @@ void onUdfcPipeWrite(uv_write_t *write, int status) {
void
onUdfcPipeConnect
(
uv_connect_t
*
connect
,
int
status
)
{
SClientUvTaskNode
*
uvTask
=
connect
->
data
;
if
(
status
!=
0
)
{
fnError
(
"client connect error, task seq: %"
PRId64
", code: %s"
,
uvTask
->
seqNum
,
uv_strerror
(
status
));
fnError
(
"client connect error, task seq: %"
PRId64
", code: %s"
,
uvTask
->
seqNum
,
uv_strerror
(
status
));
}
uvTask
->
errCode
=
status
;
...
...
@@ -1430,14 +1403,14 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
uv_async_send
(
&
udfc
->
loopTaskAync
);
uv_sem_wait
(
&
uvTask
->
taskSem
);
fnInfo
(
"udfc uvTask finished. uvTask:%"
PRId64
"-%d-%p"
,
uvTask
->
seqNum
,
uvTask
->
type
,
uvTask
);
fnInfo
(
"udfc uvTask finished. uvTask:%"
PRId64
"-%d-%p"
,
uvTask
->
seqNum
,
uvTask
->
type
,
uvTask
);
uv_sem_destroy
(
&
uvTask
->
taskSem
);
return
0
;
}
int32_t
udfcStartUvTask
(
SClientUvTaskNode
*
uvTask
)
{
fnDebug
(
"event loop start uv task. uvTask: %"
PRId64
"-%d-%p"
,
uvTask
->
seqNum
,
uvTask
->
type
,
uvTask
);
fnDebug
(
"event loop start uv task. uvTask: %"
PRId64
"-%d-%p"
,
uvTask
->
seqNum
,
uvTask
->
type
,
uvTask
);
int32_t
code
=
0
;
switch
(
uvTask
->
type
)
{
...
...
@@ -1469,7 +1442,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
}
else
{
uv_write_t
*
write
=
taosMemoryMalloc
(
sizeof
(
uv_write_t
));
write
->
data
=
pipe
->
data
;
QUEUE
*
connTaskQueue
=
&
((
SClientUvConn
*
)
pipe
->
data
)
->
taskQueue
;
QUEUE
*
connTaskQueue
=
&
((
SClientUvConn
*
)
pipe
->
data
)
->
taskQueue
;
QUEUE_INSERT_TAIL
(
connTaskQueue
,
&
uvTask
->
connTaskQueue
);
int
err
=
uv_write
(
write
,
(
uv_stream_t
*
)
pipe
,
&
uvTask
->
reqBuf
,
1
,
onUdfcPipeWrite
);
if
(
err
!=
0
)
{
...
...
@@ -1492,8 +1465,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
break
;
}
default:
{
fnError
(
"udfc event loop unknown task type."
)
break
;
fnError
(
"udfc event loop unknown task type."
)
break
;
}
}
...
...
@@ -1509,7 +1481,7 @@ void udfcAsyncTaskCb(uv_async_t *async) {
uv_mutex_unlock
(
&
udfc
->
taskQueueMutex
);
while
(
!
QUEUE_EMPTY
(
&
wq
))
{
QUEUE
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE_REMOVE
(
h
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
recvTaskQueue
);
int32_t
code
=
udfcStartUvTask
(
task
);
...
...
@@ -1520,19 +1492,17 @@ void udfcAsyncTaskCb(uv_async_t *async) {
uv_sem_post
(
&
task
->
taskSem
);
}
}
}
void
cleanUpUvTasks
(
SUdfcProxy
*
udfc
)
{
fnDebug
(
"clean up uv tasks"
)
QUEUE
wq
;
fnDebug
(
"clean up uv tasks"
)
QUEUE
wq
;
uv_mutex_lock
(
&
udfc
->
taskQueueMutex
);
QUEUE_MOVE
(
&
udfc
->
taskQueue
,
&
wq
);
uv_mutex_unlock
(
&
udfc
->
taskQueueMutex
);
while
(
!
QUEUE_EMPTY
(
&
wq
))
{
QUEUE
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE_REMOVE
(
h
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
recvTaskQueue
);
if
(
udfc
->
udfcState
==
UDFC_STATE_STOPPING
)
{
...
...
@@ -1542,7 +1512,7 @@ void cleanUpUvTasks(SUdfcProxy *udfc) {
}
while
(
!
QUEUE_EMPTY
(
&
udfc
->
uvProcTaskQueue
))
{
QUEUE
*
h
=
QUEUE_HEAD
(
&
udfc
->
uvProcTaskQueue
);
QUEUE
*
h
=
QUEUE_HEAD
(
&
udfc
->
uvProcTaskQueue
);
QUEUE_REMOVE
(
h
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
procTaskQueue
);
if
(
udfc
->
udfcState
==
UDFC_STATE_STOPPING
)
{
...
...
@@ -1572,7 +1542,7 @@ void constructUdfService(void *argsThread) {
QUEUE_INIT
(
&
udfc
->
taskQueue
);
QUEUE_INIT
(
&
udfc
->
uvProcTaskQueue
);
uv_barrier_wait
(
&
udfc
->
initBarrier
);
//TODO return value of uv_run
//
TODO return value of uv_run
uv_run
(
&
udfc
->
uvLoop
,
UV_RUN_DEFAULT
);
uv_loop_close
(
&
udfc
->
uvLoop
);
...
...
@@ -1596,8 +1566,7 @@ int32_t udfcOpen() {
uv_barrier_wait
(
&
proxy
->
initBarrier
);
uv_mutex_init
(
&
proxy
->
udfStubsMutex
);
proxy
->
udfStubs
=
taosArrayInit
(
8
,
sizeof
(
SUdfcFuncStub
));
fnInfo
(
"udfc initialized"
)
return
0
;
fnInfo
(
"udfc initialized"
)
return
0
;
}
int32_t
udfcClose
()
{
...
...
@@ -1644,7 +1613,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
if
(
gUdfdProxy
.
udfcState
!=
UDFC_STATE_READY
)
{
return
TSDB_CODE_UDF_INVALID_STATE
;
}
SClientUdfTask
*
task
=
taosMemoryCalloc
(
1
,
sizeof
(
SClientUdfTask
));
SClientUdfTask
*
task
=
taosMemoryCalloc
(
1
,
sizeof
(
SClientUdfTask
));
task
->
errCode
=
0
;
task
->
session
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfcUvSession
));
task
->
session
->
udfc
=
&
gUdfdProxy
;
...
...
@@ -1681,16 +1650,16 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
}
int32_t
callUdf
(
UdfcFuncHandle
handle
,
int8_t
callType
,
SSDataBlock
*
input
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
state2
,
SSDataBlock
*
output
,
SUdfInterBuf
*
newState
)
{
SSDataBlock
*
output
,
SUdfInterBuf
*
newState
)
{
fnDebug
(
"udfc call udf. callType: %d, funcHandle: %p"
,
callType
,
handle
);
SUdfcUvSession
*
session
=
(
SUdfcUvSession
*
)
handle
;
SUdfcUvSession
*
session
=
(
SUdfcUvSession
*
)
handle
;
if
(
session
->
udfUvPipe
==
NULL
)
{
fnError
(
"No pipe to udfd"
);
return
TSDB_CODE_UDF_PIPE_NO_PIPE
;
}
SClientUdfTask
*
task
=
taosMemoryCalloc
(
1
,
sizeof
(
SClientUdfTask
));
task
->
errCode
=
0
;
task
->
session
=
(
SUdfcUvSession
*
)
handle
;
task
->
session
=
(
SUdfcUvSession
*
)
handle
;
task
->
type
=
UDF_TASK_CALL
;
SUdfCallRequest
*
req
=
&
task
->
_call
.
req
;
...
...
@@ -1774,7 +1743,8 @@ int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInter
// input: interbuf1, interbuf2
// output: resultBuf
int32_t
doCallUdfAggMerge
(
UdfcFuncHandle
handle
,
SUdfInterBuf
*
interBuf1
,
SUdfInterBuf
*
interBuf2
,
SUdfInterBuf
*
resultBuf
)
{
int32_t
doCallUdfAggMerge
(
UdfcFuncHandle
handle
,
SUdfInterBuf
*
interBuf1
,
SUdfInterBuf
*
interBuf2
,
SUdfInterBuf
*
resultBuf
)
{
int8_t
callType
=
TSDB_UDF_CALL_AGG_MERGE
;
int32_t
err
=
callUdf
(
handle
,
callType
,
NULL
,
interBuf1
,
interBuf2
,
NULL
,
resultBuf
);
return
err
;
...
...
@@ -1788,7 +1758,7 @@ int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdf
return
err
;
}
int32_t
doCallUdfScalarFunc
(
UdfcFuncHandle
handle
,
SScalarParam
*
input
,
int32_t
numOfCols
,
SScalarParam
*
output
)
{
int32_t
doCallUdfScalarFunc
(
UdfcFuncHandle
handle
,
SScalarParam
*
input
,
int32_t
numOfCols
,
SScalarParam
*
output
)
{
int8_t
callType
=
TSDB_UDF_CALL_SCALA_PROC
;
SSDataBlock
inputBlock
=
{
0
};
convertScalarParamToDataBlock
(
input
,
numOfCols
,
&
inputBlock
);
...
...
@@ -1804,7 +1774,7 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
}
int32_t
doTeardownUdf
(
UdfcFuncHandle
handle
)
{
SUdfcUvSession
*
session
=
(
SUdfcUvSession
*
)
handle
;
SUdfcUvSession
*
session
=
(
SUdfcUvSession
*
)
handle
;
if
(
session
->
udfUvPipe
==
NULL
)
{
fnError
(
"tear down udf. pipe to udfd does not exist. udf name: %s"
,
session
->
udfName
);
...
...
@@ -1827,7 +1797,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
udfcRunUdfUvTask
(
task
,
UV_TASK_DISCONNECT
);
fnInfo
(
"tear down udf. udf name: %s, udf func handle: %p"
,
session
->
udfName
,
handle
);
//TODO: synchronization refactor between libuv event loop and request thread
//
TODO: synchronization refactor between libuv event loop and request thread
if
(
session
->
udfUvPipe
!=
NULL
&&
session
->
udfUvPipe
->
data
!=
NULL
)
{
SClientUvConn
*
conn
=
session
->
udfUvPipe
->
data
;
conn
->
session
=
NULL
;
...
...
source/libs/function/test/runUdf.c
浏览文件 @
c4b3da50
...
...
@@ -53,7 +53,7 @@ int scalarFuncTest() {
blockDataEnsureCapacity
(
pBlock
,
1024
);
pBlock
->
info
.
rows
=
1024
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
for
(
int32_t
j
=
0
;
j
<
pBlock
->
info
.
rows
;
++
j
)
{
colDataAppendInt32
(
pCol
,
j
,
&
j
);
}
...
...
@@ -68,14 +68,13 @@ int scalarFuncTest() {
SColumnInfoData
*
col
=
output
.
columnData
;
for
(
int32_t
i
=
0
;
i
<
output
.
numOfRows
;
++
i
)
{
if
(
i
%
100
==
0
)
fprintf
(
stderr
,
"%d
\t
%d
\n
"
,
i
,
*
(
int32_t
*
)(
col
->
pData
+
i
*
sizeof
(
int32_t
)));
if
(
i
%
100
==
0
)
fprintf
(
stderr
,
"%d
\t
%d
\n
"
,
i
,
*
(
int32_t
*
)(
col
->
pData
+
i
*
sizeof
(
int32_t
)));
}
colDataDestroy
(
output
.
columnData
);
taosMemoryFree
(
output
.
columnData
);
}
int64_t
end
=
taosGetTimestampUs
();
fprintf
(
stderr
,
"time: %f
\n
"
,
(
end
-
beg
)
/
1000
.
0
);
fprintf
(
stderr
,
"time: %f
\n
"
,
(
end
-
beg
)
/
1000
.
0
);
doTeardownUdf
(
handle
);
return
0
;
...
...
@@ -98,7 +97,7 @@ int aggregateFuncTest() {
blockDataEnsureCapacity
(
pBlock
,
1024
);
pBlock
->
info
.
rows
=
1024
;
SColumnInfoData
*
pColInfo
=
bdGetColumnInfoData
(
pBlock
,
0
);
SColumnInfoData
*
pColInfo
=
bdGetColumnInfoData
(
pBlock
,
0
);
for
(
int32_t
j
=
0
;
j
<
pBlock
->
info
.
rows
;
++
j
)
{
colDataAppendInt32
(
pColInfo
,
j
,
&
j
);
}
...
...
@@ -111,7 +110,7 @@ int aggregateFuncTest() {
taosArrayDestroy
(
pBlock
->
pDataBlock
);
doCallUdfAggFinalize
(
handle
,
&
newBuf
,
&
resultBuf
);
fprintf
(
stderr
,
"agg result: %f
\n
"
,
*
(
double
*
)
resultBuf
.
buf
);
fprintf
(
stderr
,
"agg result: %f
\n
"
,
*
(
double
*
)
resultBuf
.
buf
);
freeUdfInterBuf
(
&
buf
);
freeUdfInterBuf
(
&
newBuf
);
...
...
source/libs/function/test/udf1.c
浏览文件 @
c4b3da50
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef LINUX
#include <unistd.h>
#endif
...
...
@@ -9,16 +9,11 @@
#endif
#include "taosudf.h"
DLL_EXPORT
int32_t
udf1_init
()
{
return
0
;
}
DLL_EXPORT
int32_t
udf1_init
()
{
return
0
;
}
DLL_EXPORT
int32_t
udf1_destroy
()
{
return
0
;
}
DLL_EXPORT
int32_t
udf1_destroy
()
{
return
0
;
}
DLL_EXPORT
int32_t
udf1
(
SUdfDataBlock
*
block
,
SUdfColumn
*
resultCol
)
{
DLL_EXPORT
int32_t
udf1
(
SUdfDataBlock
*
block
,
SUdfColumn
*
resultCol
)
{
SUdfColumnMeta
*
meta
=
&
resultCol
->
colMeta
;
meta
->
bytes
=
4
;
meta
->
type
=
TSDB_DATA_TYPE_INT
;
...
...
@@ -35,12 +30,12 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
break
;
}
}
if
(
j
==
block
->
numOfCols
)
{
if
(
j
==
block
->
numOfCols
)
{
int32_t
luckyNum
=
88
;
udfColDataSet
(
resultCol
,
i
,
(
char
*
)
&
luckyNum
,
false
);
}
}
//to simulate actual processing delay by udf
//
to simulate actual processing delay by udf
#ifdef LINUX
usleep
(
1
*
1000
);
// usleep takes sleep time in us (1 millionth of a second)
#endif
...
...
source/libs/function/test/udf2.c
浏览文件 @
c4b3da50
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosudf.h"
DLL_EXPORT
int32_t
udf2_init
()
{
return
0
;
}
DLL_EXPORT
int32_t
udf2_init
()
{
return
0
;
}
DLL_EXPORT
int32_t
udf2_destroy
()
{
return
0
;
}
DLL_EXPORT
int32_t
udf2_destroy
()
{
return
0
;
}
DLL_EXPORT
int32_t
udf2_start
(
SUdfInterBuf
*
buf
)
{
DLL_EXPORT
int32_t
udf2_start
(
SUdfInterBuf
*
buf
)
{
*
(
int64_t
*
)(
buf
->
buf
)
=
0
;
buf
->
bufLen
=
sizeof
(
double
);
buf
->
numOfResult
=
0
;
return
0
;
}
DLL_EXPORT
int32_t
udf2
(
SUdfDataBlock
*
block
,
SUdfInterBuf
*
interBuf
,
SUdfInterBuf
*
newInterBuf
)
{
DLL_EXPORT
int32_t
udf2
(
SUdfDataBlock
*
block
,
SUdfInterBuf
*
interBuf
,
SUdfInterBuf
*
newInterBuf
)
{
double
sumSquares
=
*
(
double
*
)
interBuf
->
buf
;
int8_t
numNotNull
=
0
;
for
(
int32_t
i
=
0
;
i
<
block
->
numOfCols
;
++
i
)
{
SUdfColumn
*
col
=
block
->
udfCols
[
i
];
if
(
!
(
col
->
colMeta
.
type
==
TSDB_DATA_TYPE_INT
||
col
->
colMeta
.
type
==
TSDB_DATA_TYPE_DOUBLE
))
{
if
(
!
(
col
->
colMeta
.
type
==
TSDB_DATA_TYPE_INT
||
col
->
colMeta
.
type
==
TSDB_DATA_TYPE_DOUBLE
))
{
return
TSDB_CODE_UDF_INVALID_INPUT
;
}
}
...
...
@@ -67,7 +62,7 @@ DLL_EXPORT int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterB
return
0
;
}
DLL_EXPORT
int32_t
udf2_finish
(
SUdfInterBuf
*
buf
,
SUdfInterBuf
*
resultData
)
{
DLL_EXPORT
int32_t
udf2_finish
(
SUdfInterBuf
*
buf
,
SUdfInterBuf
*
resultData
)
{
if
(
buf
->
numOfResult
==
0
)
{
resultData
->
numOfResult
=
0
;
return
0
;
...
...
tools/scripts/codeFormat.sh
浏览文件 @
c4b3da50
...
...
@@ -16,7 +16,7 @@ FORMAT_DIR_LIST=(
"
${
PRJ_ROOT_DIR
}
/source/libs/catalog"
"
${
PRJ_ROOT_DIR
}
/source/libs/command"
"
${
PRJ_ROOT_DIR
}
/source/libs/executor"
#
"${PRJ_ROOT_DIR}/source/libs/function"
"
${
PRJ_ROOT_DIR
}
/source/libs/function"
"
${
PRJ_ROOT_DIR
}
/source/libs/index"
"
${
PRJ_ROOT_DIR
}
/source/libs/monitor"
"
${
PRJ_ROOT_DIR
}
/source/libs/nodes"
...
...
@@ -49,3 +49,5 @@ for d in ${FORMAT_DIR_LIST[@]}; do
done
cd
${
ORIGIN_DIR
}
# find source/libs/ -path ./source/libs/qworker -prune -o -regex '.*\.\(cpp\|hpp\|c\|h\)' -print
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录