Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
90284027
T
TDengine
项目概览
taosdata
/
TDengine
11 个月 前同步成功
通知
1179
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
90284027
编写于
5月 06, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode_refact1
上级
68fc2acc
1a2d1705
变更
20
展开全部
隐藏空白更改
内联
并排
Showing
20 changed file
with
840 addition
and
164 deletion
+840
-164
include/libs/function/tudf.h
include/libs/function/tudf.h
+7
-4
include/libs/scalar/scalar.h
include/libs/scalar/scalar.h
+2
-0
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+57
-18
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+5
-5
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+9
-12
source/libs/executor/inc/tsort.h
source/libs/executor/inc/tsort.h
+11
-4
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+32
-31
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+45
-37
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+40
-19
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+18
-22
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+2
-3
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+3
-1
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+22
-3
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+6
-0
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+2
-4
tests/requirements.txt
tests/requirements.txt
+5
-0
tests/script/tsim/query/udf.sim
tests/script/tsim/query/udf.sim
+21
-0
tests/system-test/2-query/sqrt.py
tests/system-test/2-query/sqrt.py
+551
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-0
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
include/libs/function/tudf.h
浏览文件 @
90284027
...
...
@@ -29,7 +29,11 @@ extern "C" {
#endif
#define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock."
#ifdef _WIN32
#define UDF_LISTEN_PIPE_NAME_PREFIX "\\\\?\\pipe\\udfd.sock"
#else
#define UDF_LISTEN_PIPE_NAME_PREFIX ".udfd.sock."
#endif
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
//======================================================================================
...
...
@@ -129,8 +133,8 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
// begin API to UDF writer.
// dynamic lib init and destroy
typedef
int32_t
(
*
TUdf
Setup
Func
)();
typedef
int32_t
(
*
TUdf
Teardown
Func
)();
typedef
int32_t
(
*
TUdf
Init
Func
)();
typedef
int32_t
(
*
TUdf
Destroy
Func
)();
//TODO: add API to check function arguments type, number etc.
...
...
@@ -242,7 +246,6 @@ static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRo
return
0
;
}
typedef
int32_t
(
*
TUdfFreeUdfColumnFunc
)(
SUdfColumn
*
column
);
typedef
int32_t
(
*
TUdfScalarProcFunc
)(
SUdfDataBlock
*
block
,
SUdfColumn
*
resultCol
);
typedef
int32_t
(
*
TUdfAggStartFunc
)(
SUdfInterBuf
*
buf
);
...
...
include/libs/scalar/scalar.h
浏览文件 @
90284027
...
...
@@ -91,6 +91,8 @@ int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t
qStartTsFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
);
int32_t
qEndTsFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
);
int32_t
qTbnameFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
);
#ifdef __cplusplus
}
#endif
...
...
source/client/test/clientTests.cpp
浏览文件 @
90284027
...
...
@@ -587,15 +587,34 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
for(int32_t i = 0; i < 10000; ++i) {
char sql[512] = {0};
sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
pRes = taos_query(pConn, "create table tu2 using st2 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
for(int32_t i = 0; i < 10000000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p));
...
...
@@ -604,24 +623,44 @@ TEST(testCase, projection_query_tables) {
taos_free_result(p);
}
pRes = taos_query(pConn, "select * from tu");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
printf("start to insert next table\n");
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
for(int32_t i = 0; i < 10000000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p));
}
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
taos_free_result(p);
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
// taos_free_result(pRes);
taos_close(pConn);
}
...
...
@@ -659,7 +698,7 @@ TEST(testCase, agg_query_tables) {
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use
db
"
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use
abc1
"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
...
...
source/common/src/tdatablock.c
浏览文件 @
90284027
...
...
@@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
for
(
int32_t
i
=
0
;
i
<
pDest
->
info
.
numOfCols
;
++
i
)
{
int32_t
mapIndex
=
i
;
if
(
pIndexMap
)
{
mapIndex
=
*
(
int32_t
*
)
taosArrayGet
(
pIndexMap
,
i
);
}
//
if (pIndexMap) {
//
mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
//
}
SColumnInfoData
*
pCol2
=
taosArrayGet
(
pDest
->
pDataBlock
,
i
);
SColumnInfoData
*
pCol1
=
taosArrayGet
(
pSrc
->
pDataBlock
,
mapIndex
);
...
...
@@ -493,12 +493,12 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
for
(
int32_t
j
=
startIndex
;
j
<
(
startIndex
+
rowCount
);
++
j
)
{
bool
isNull
=
false
;
if
(
pBlock
->
pBlockAgg
==
NULL
)
{
isNull
=
colDataIsNull
(
pColData
,
pBlock
->
info
.
rows
,
j
,
NULL
);
isNull
=
colDataIsNull
_s
(
pColData
,
pBlock
->
info
.
rows
);
}
else
{
isNull
=
colDataIsNull
(
pColData
,
pBlock
->
info
.
rows
,
j
,
pBlock
->
pBlockAgg
[
i
]);
}
char
*
p
=
colDataGetData
(
pColData
,
j
);
char
*
p
=
colDataGetData
(
pColData
,
j
);
colDataAppend
(
pDstCol
,
j
-
startIndex
,
p
,
isNull
);
}
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
90284027
...
...
@@ -87,9 +87,7 @@ typedef struct SResultInfo { // TODO refactor
typedef
struct
STableQueryInfo
{
TSKEY
lastKey
;
// last check ts, todo remove it later
SResultRowPosition
pos
;
// current active time window
// int32_t groupIndex; // group id in table list
// SVariant tag;
// SResultRowInfo resInfo; // result info
}
STableQueryInfo
;
typedef
enum
{
...
...
@@ -363,11 +361,12 @@ typedef struct STableScanInfo {
}
STableScanInfo
;
typedef
struct
STagScanInfo
{
SColumnInfo
*
pCols
;
SSDataBlock
*
pRes
;
int32_t
totalTables
;
int32_t
curPos
;
void
*
pReader
;
SColumnInfo
*
pCols
;
SSDataBlock
*
pRes
;
SArray
*
pColMatchInfo
;
int32_t
curPos
;
SReadHandle
readHandle
;
STableGroupInfo
*
pTableGroups
;
}
STagScanInfo
;
typedef
struct
SStreamBlockScanInfo
{
...
...
@@ -579,9 +578,8 @@ typedef struct SSortOperatorInfo {
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SArray
*
inputSlotMap
;
// for index map from table scan output
SArray
*
pColMatchInfo
;
// for index map from table scan output
int32_t
bufPageSize
;
// int32_t numOfRowsInRes;
// TODO extact struct
int64_t
startTs
;
// sort start time
...
...
@@ -646,7 +644,7 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void
destroyBasicOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
SSDataBlock
*
getSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
);
SSDataBlock
*
getSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
,
SArray
*
pColMatchInfo
);
SSDataBlock
*
loadNextDataBlock
(
void
*
param
);
void
setResultRowInitCtx
(
SResultRow
*
pResult
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
);
...
...
@@ -704,7 +702,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock
*
pResultBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SNode
*
pOnCondition
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTagScanOperatorInfo
(
void
*
pReaderHandle
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTagScanOperatorInfo
(
SReadHandle
*
pReadHandle
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SSDataBlock
*
pResBlock
,
SArray
*
pColMatchInfo
,
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
...
...
@@ -717,7 +715,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
bool
createDummyCol
);
void
finalizeQueryResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
void
copyTsColoum
(
SSDataBlock
*
pRes
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
STableQueryInfo
*
createTableQueryInfo
(
void
*
buf
,
STimeWindow
win
);
...
...
source/libs/executor/inc/tsort.h
浏览文件 @
90284027
...
...
@@ -117,18 +117,25 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle);
/**
*
* @param pHandle
* @param colI
ndex
* @param colI
d
* @return
*/
bool
tsortIsNullVal
(
STupleHandle
*
pVHandle
,
int32_t
colI
ndex
);
bool
tsortIsNullVal
(
STupleHandle
*
pVHandle
,
int32_t
colI
d
);
/**
*
* @param pHandle
* @param colI
ndex
* @param colI
d
* @return
*/
void
*
tsortGetValue
(
STupleHandle
*
pVHandle
,
int32_t
colIndex
);
void
*
tsortGetValue
(
STupleHandle
*
pVHandle
,
int32_t
colId
);
/**
*
* @param pSortHandle
* @return
*/
SSDataBlock
*
tsortGetSortedDataBlock
(
const
SSortHandle
*
pSortHandle
);
#ifdef __cplusplus
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
90284027
...
...
@@ -3520,7 +3520,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSortedMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
NULL
);
}
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
...
...
@@ -4701,7 +4701,7 @@ static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static
SArray
*
extractColumnInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
);
static
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
,
SNodeList
*
pNodeListTarget
);
static
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
createIndexMap
(
SNodeList
*
pNodeList
);
static
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
static
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
);
...
...
@@ -4739,7 +4739,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pScanPhyNode
->
node
.
pOutputDataBlockDesc
);
SQueryTableDataCond
cond
=
{
0
};
int32_t
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
...
...
@@ -4783,6 +4782,25 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pHandle
,
pResBlock
,
&
pScanNode
->
tableName
,
pScanNode
->
node
.
pConditions
,
pSysScanPhyNode
->
mgmtEpSet
,
colList
,
pTaskInfo
,
pSysScanPhyNode
->
showRewrite
,
pSysScanPhyNode
->
accountId
);
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
==
type
)
{
STagScanPhysiNode
*
pScanPhyNode
=
(
STagScanPhysiNode
*
)
pPhyNode
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pScanPhyNode
->
node
.
pOutputDataBlockDesc
);
int32_t
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
int32_t
num
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pScanPhyNode
->
pScanPseudoCols
,
NULL
,
&
num
);
int32_t
numOfOutputCols
=
0
;
SArray
*
colList
=
extractColMatchInfo
(
pScanPhyNode
->
pScanPseudoCols
,
pScanPhyNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
);
SOperatorInfo
*
pOperator
=
createTagScanOperatorInfo
(
pHandle
,
pExprInfo
,
num
,
pResBlock
,
colList
,
pTableGroupInfo
,
pTaskInfo
);
return
pOperator
;
}
else
{
ASSERT
(
0
);
}
...
...
@@ -4852,16 +4870,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSortPhysiNode
*
pSortPhyNode
=
(
SSortPhysiNode
*
)
pPhyNode
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
SArray
*
info
=
createSortInfo
(
pSortPhyNode
->
pSortKeys
,
pSortPhyNode
->
pTargets
);
SArray
*
slotMap
=
createIndexMap
(
pSortPhyNode
->
pTargets
);
SArray
*
info
=
createSortInfo
(
pSortPhyNode
->
pSortKeys
);
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
NULL
;
if
(
pSortPhyNode
->
pExprs
!=
NULL
)
{
pExprInfo
=
createExprInfo
(
pSortPhyNode
->
pExprs
,
NULL
,
&
numOfCols
);
}
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSortPhyNode
->
pExprs
,
NULL
,
&
numOfCols
);
pOptr
=
createSortOperatorInfo
(
ops
[
0
],
pResBlock
,
info
,
pExprInfo
,
numOfCols
,
slotMap
,
pTaskInfo
);
int32_t
numOfOutputCols
=
0
;
SArray
*
pColList
=
extractColMatchInfo
(
pSortPhyNode
->
pTargets
,
pSortPhyNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
);
pOptr
=
createSortOperatorInfo
(
ops
[
0
],
pResBlock
,
info
,
pExprInfo
,
numOfCols
,
pColList
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
==
type
)
{
SSessionWinodwPhysiNode
*
pSessionNode
=
(
SSessionWinodwPhysiNode
*
)
pPhyNode
;
...
...
@@ -5019,7 +5037,7 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) {
return
pList
;
}
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
,
SNodeList
*
pNodeListTarget
)
{
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
)
{
size_t
numOfCols
=
LIST_LENGTH
(
pNodeList
);
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SBlockOrderInfo
));
if
(
pList
==
NULL
)
{
...
...
@@ -5034,22 +5052,7 @@ SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget) {
bi
.
nullFirst
=
(
pSortKey
->
nullOrder
==
NULL_ORDER_FIRST
);
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pSortKey
->
pExpr
;
bool
found
=
false
;
for
(
int32_t
j
=
0
;
j
<
LIST_LENGTH
(
pNodeListTarget
);
++
j
)
{
STargetNode
*
pTarget
=
(
STargetNode
*
)
nodesListGetNode
(
pNodeListTarget
,
j
);
SColumnNode
*
pColNodeT
=
(
SColumnNode
*
)
pTarget
->
pExpr
;
if
(
pColNode
->
slotId
==
pColNodeT
->
slotId
)
{
// to find slotId in PhysiSort OutputDataBlockDesc
bi
.
slotId
=
pTarget
->
slotId
;
found
=
true
;
break
;
}
}
if
(
!
found
)
{
qError
(
"sort slot id does not found"
);
}
bi
.
slotId
=
pColNode
->
slotId
;
taosArrayPush
(
pList
,
&
bi
);
}
...
...
@@ -5088,7 +5091,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
SColMatchInfo
c
=
{
0
};
c
.
output
=
true
;
c
.
colId
=
pColNode
->
colId
;
c
.
colId
=
pColNode
->
colId
;
c
.
targetSlotId
=
pNode
->
slotId
;
taosArrayPush
(
pList
,
&
c
);
}
...
...
@@ -5166,9 +5169,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
#if 0
return tsdbQueryTables(pHandle->reader, &cond, pTableGroupInfo, queryId, taskId);
#endif
return
tsdbQueryTables
(
pHandle
->
vnode
,
&
cond
,
pTableGroupInfo
,
queryId
,
taskId
);
_error:
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
90284027
...
...
@@ -13,15 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include
"ttime.h"
#include
<libs/function/function.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "systable.h"
#include "tglobal.h"
#include "tname.h"
#include "
systabl
e.h"
#include "
ttim
e.h"
#include "tdatablock.h"
#include "tmsg.h"
...
...
@@ -1159,16 +1160,17 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
}
static
SSDataBlock
*
doTagScan
(
SOperatorInfo
*
pOperator
)
{
#if 0
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
#if 0
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
STagScanInfo *pInfo = pOperator->info;
SSDataBlock *pRes = pInfo->pRes;
*newgroup = false;
int32_t count = 0;
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
...
...
@@ -1237,55 +1239,54 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
} else { // return only the tags|table name etc.
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
#endif
count = 0
;
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
int32_t i = pInfo->curPos++
;
STagScanInfo
*
pInfo
=
pOperator
->
info
;
SExprInfo
*
pExprInfo
=
&
pOperator
->
pExpr
[
0
];
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
STableQueryInfo* item = taosArrayGetP(pa, i
);
SArray
*
pa
=
taosArrayGetP
(
pInfo
->
pTableGroups
->
pGroupList
,
0
);
char *data = NULL, *dst = NULL;
int16_t type = 0, bytes = 0;
for(int32_t j = 0; j < pOperator->numOfExprs; ++j) {
// not assign value in case of user defined constant output column
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
continue;
}
char
str
[
512
]
=
{
0
};
int32_t
count
=
0
;
SMetaReader
mr
=
{
0
};
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j);
type = pExprInfo[j].base.resSchema.type;
bytes = pExprInfo[j].base.resSchema.bytes;
while
(
pInfo
->
curPos
<
pInfo
->
pTableGroups
->
numOfTables
&&
count
<
pOperator
->
resultInfo
.
capacity
)
{
STableKeyInfo
*
item
=
taosArrayGet
(
pa
,
pInfo
->
curPos
);
if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
}
for
(
int32_t
j
=
0
;
j
<
pOperator
->
numOfExprs
;
++
j
)
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
pRes
->
pDataBlock
,
pExprInfo
[
j
].
base
.
resSchema
.
slotId
);
// refactor later
if
(
fmIsScanPseudoColumnFunc
(
pExprInfo
[
j
].
pExpr
->
_function
.
functionId
))
{
metaReaderInit
(
&
mr
,
pInfo
->
readHandle
.
meta
,
0
);
metaGetTableEntryByUid
(
&
mr
,
item
->
uid
);
STR_TO_VARSTR
(
str
,
mr
.
me
.
name
);
metaReaderClear
(
&
mr
);
colDataAppend
(
pDst
,
count
,
str
,
false
);
dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
doSetTagValueToResultBuf(dst, data, type, bytes);
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
// doSetTagValueToResultBuf(dst, data, type, bytes);
}
count
+=
1
;
}
if (
pInfo->curPos >= pInfo->total
Tables) {
if
(
++
pInfo
->
curPos
>=
pInfo
->
pTableGroups
->
numOf
Tables
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
//qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
}
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
setTaskStatus(p
Operator->pRuntimeEnv
, TASK_COMPLETED);
setTaskStatus
(
p
TaskInfo
,
TASK_COMPLETED
);
}
pRes
->
info
.
rows
=
count
;
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
#endif
return
TSDB_CODE_SUCCESS
;
return
(
pRes
->
info
.
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
static
void
destroyTagScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
...
...
@@ -1293,14 +1294,18 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
}
SOperatorInfo
*
createTagScanOperatorInfo
(
void
*
readHandle
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createTagScanOperatorInfo
(
SReadHandle
*
pReadHandle
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SSDataBlock
*
pResBlock
,
SArray
*
pColMatchInfo
,
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
STagScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STagScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
pReader
=
readHandle
;
pInfo
->
pTableGroups
=
pTableGroupInfo
;
pInfo
->
pColMatchInfo
=
pColMatchInfo
;
pInfo
->
pRes
=
pResBlock
;
pInfo
->
readHandle
=
*
pReadHandle
;
pInfo
->
curPos
=
0
;
pOperator
->
name
=
"TagScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
;
...
...
@@ -1308,9 +1313,12 @@ SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfExprs
=
numOfOutput
;
pOperator
->
numOfExprs
=
numOfOutput
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
initResultSizeInfo
(
pOperator
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTagScan
,
NULL
,
NULL
,
destroyTagScanOperatorInfo
,
NULL
,
NULL
,
NULL
);
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
90284027
...
...
@@ -5,7 +5,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator);
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResBlock
,
SArray
*
pSortInfo
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SArray
*
p
IndexMap
,
SExecTaskInfo
*
pTaskInfo
)
{
SArray
*
p
ColMatchColInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
SSortOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
int32_t
rowSize
=
pResBlock
->
info
.
rowSize
;
...
...
@@ -20,17 +20,19 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
pInfo
->
binfo
.
pRes
=
pResBlock
;
initResultSizeInfo
(
pOperator
,
1024
);
pInfo
->
bufPageSize
=
rowSize
<
1024
?
1024
*
2
:
rowSize
*
2
;
// there are headers, so pageSize = rowSize + header
pInfo
->
sortBufSize
=
pInfo
->
bufPageSize
*
16
;
// TODO dynamic set the available sort buffer
pInfo
->
pSortInfo
=
pSortInfo
;
pInfo
->
inputSlotMap
=
pIndexMap
;
pInfo
->
pColMatchInfo
=
pColMatchColInfo
;
pOperator
->
name
=
"SortOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SORT
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
// lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header
// pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doSort
,
NULL
,
NULL
,
destroyOrderOperatorInfo
,
NULL
,
NULL
,
NULL
);
...
...
@@ -45,14 +47,12 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
return
NULL
;
}
// TODO merge aggregate super table
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
)
{
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
bool
isNull
=
tsortIsNullVal
(
pTupleHandle
,
i
);
if
(
isNull
)
{
colDataAppend
(
pColInfo
,
pBlock
->
info
.
rows
,
NULL
,
true
);
colDataAppend
NULL
(
pColInfo
,
pBlock
->
info
.
rows
);
}
else
{
char
*
pData
=
tsortGetValue
(
pTupleHandle
,
i
);
colDataAppend
(
pColInfo
,
pBlock
->
info
.
rows
,
pData
,
false
);
...
...
@@ -62,11 +62,12 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
pBlock
->
info
.
rows
+=
1
;
}
SSDataBlock
*
getSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
)
{
SSDataBlock
*
getSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
,
SArray
*
pColMatchInfo
)
{
blockDataCleanup
(
pDataBlock
);
blockDataEnsureCapacity
(
pDataBlock
,
capacity
);
ASSERT
(
taosArrayGetSize
(
pColMatchInfo
)
==
pDataBlock
->
info
.
numOfCols
);
blockDataEnsureCapacity
(
pDataBlock
,
capacity
);
SSDataBlock
*
p
=
tsortGetSortedDataBlock
(
pHandle
);
blockDataEnsureCapacity
(
p
,
capacity
);
while
(
1
)
{
STupleHandle
*
pTupleHandle
=
tsortNextTuple
(
pHandle
);
...
...
@@ -74,12 +75,32 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
break
;
}
appendOneRowToDataBlock
(
p
DataBlock
,
pTupleHandle
);
if
(
p
DataBlock
->
info
.
rows
>=
capacity
)
{
appendOneRowToDataBlock
(
p
,
pTupleHandle
);
if
(
p
->
info
.
rows
>=
capacity
)
{
return
pDataBlock
;
}
}
if
(
p
->
info
.
rows
>
0
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pColMatchInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchInfo
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
for
(
int32_t
j
=
0
;
j
<
p
->
info
.
numOfCols
;
++
j
)
{
SColumnInfoData
*
pSrc
=
taosArrayGet
(
p
->
pDataBlock
,
j
);
if
(
pSrc
->
info
.
colId
==
pmInfo
->
colId
)
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
targetSlotId
);
colDataAssign
(
pDst
,
pSrc
,
p
->
info
.
rows
);
break
;
}
}
}
pDataBlock
->
info
.
rows
=
p
->
info
.
rows
;
pDataBlock
->
info
.
capacity
=
p
->
info
.
rows
;
}
blockDataDestroy
(
p
);
return
(
pDataBlock
->
info
.
rows
>
0
)
?
pDataBlock
:
NULL
;
}
...
...
@@ -106,16 +127,16 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
SSortOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
);
}
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
pInfo
->
inputSlotMap
,
SORT_SINGLESOURCE_SORT
,
pInfo
->
bufPageSize
,
numOfBufPage
,
pInfo
->
binfo
.
pRes
,
pTaskInfo
->
id
.
str
);
// pInfo->binfo.pRes is not equalled to the input datablock.
// int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
pInfo
->
pColMatchInfo
,
SORT_SINGLESOURCE_SORT
,
-
1
,
-
1
,
NULL
,
pTaskInfo
->
id
.
str
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
loadNextDataBlock
,
applyScalarFunction
,
pOperator
);
SSortSource
*
ps
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortSource
));
ps
->
param
=
pOperator
->
pDownstream
[
0
];
tsortAddSource
(
pInfo
->
pSortHandle
,
ps
);
...
...
@@ -127,7 +148,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
);
}
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
...
...
@@ -135,5 +156,5 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
pInfo
->
binfo
.
pRes
=
blockDataDestroy
(
pInfo
->
binfo
.
pRes
);
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
inputSlotMap
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
}
source/libs/executor/src/tsort.c
浏览文件 @
90284027
...
...
@@ -64,25 +64,8 @@ struct SSortHandle {
static
int32_t
msortComparFn
(
const
void
*
pLeft
,
const
void
*
pRight
,
void
*
param
);
static
SSDataBlock
*
createDataBlock_rv
(
SSchema
*
pSchema
,
int32_t
numOfCols
)
{
SSDataBlock
*
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
pBlock
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
pBlock
->
info
.
numOfCols
=
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{
0
};
colInfo
.
info
.
type
=
pSchema
[
i
].
type
;
colInfo
.
info
.
bytes
=
pSchema
[
i
].
bytes
;
colInfo
.
info
.
colId
=
pSchema
[
i
].
colId
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfo
);
if
(
IS_VAR_DATA_TYPE
(
colInfo
.
info
.
type
))
{
pBlock
->
info
.
hasVarCol
=
true
;
}
}
return
pBlock
;
SSDataBlock
*
tsortGetSortedDataBlock
(
const
SSortHandle
*
pSortHandle
)
{
return
createOneDataBlock
(
pSortHandle
->
pDataBlock
,
false
);
}
/**
...
...
@@ -98,7 +81,10 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t
pSortHandle
->
numOfPages
=
numOfPages
;
pSortHandle
->
pSortInfo
=
pSortInfo
;
pSortHandle
->
pIndexMap
=
pIndexMap
;
pSortHandle
->
pDataBlock
=
createOneDataBlock
(
pBlock
,
false
);
if
(
pBlock
!=
NULL
)
{
pSortHandle
->
pDataBlock
=
createOneDataBlock
(
pBlock
,
false
);
}
pSortHandle
->
pOrderedSource
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pSortHandle
->
cmpParam
.
orderInfo
=
pSortInfo
;
...
...
@@ -530,6 +516,17 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
if
(
pHandle
->
pDataBlock
==
NULL
)
{
pHandle
->
pDataBlock
=
createOneDataBlock
(
pBlock
,
false
);
// calculate the buffer pages according to the total available buffers.
int32_t
rowSize
=
blockDataGetRowSize
(
pBlock
);
if
(
rowSize
*
4
>
4096
)
{
pHandle
->
pageSize
=
rowSize
*
4
;
}
else
{
pHandle
->
pageSize
=
4096
;
}
// todo!!
pHandle
->
numOfPages
=
1024
;
sortBufSize
=
pHandle
->
numOfPages
*
pHandle
->
pageSize
;
}
// perform the scalar function calculation before apply the sort
...
...
@@ -538,7 +535,6 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
}
// todo relocate the columns
int32_t
code
=
blockDataMerge
(
pHandle
->
pDataBlock
,
pBlock
,
pHandle
->
pIndexMap
);
if
(
code
!=
0
)
{
return
code
;
...
...
@@ -689,7 +685,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
bool
tsortIsNullVal
(
STupleHandle
*
pVHandle
,
int32_t
colIndex
)
{
SColumnInfoData
*
pColInfoSrc
=
taosArrayGet
(
pVHandle
->
pBlock
->
pDataBlock
,
colIndex
);
return
colDataIsNull
(
pColInfoSrc
,
0
,
pVHandle
->
rowIndex
,
NULL
);
return
colDataIsNull
_s
(
pColInfoSrc
,
pVHandle
->
rowIndex
);
}
void
*
tsortGetValue
(
STupleHandle
*
pVHandle
,
int32_t
colIndex
)
{
...
...
source/libs/function/src/builtins.c
浏览文件 @
90284027
...
...
@@ -242,8 +242,7 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
"The parameters of first/last can only be columns"
);
}
uint8_t
paraType
=
((
SExprNode
*
)
pPara
)
->
resType
.
type
;
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
paraType
].
bytes
,
.
type
=
paraType
};
pFunc
->
node
.
resType
=
((
SExprNode
*
)
pPara
)
->
resType
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -917,7 +916,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
translateFunc
=
translateTbnameColumn
,
.
getEnvFunc
=
NULL
,
.
initFunc
=
NULL
,
.
sprocessFunc
=
NULL
,
.
sprocessFunc
=
qTbnameFunction
,
.
finalizeFunc
=
NULL
},
{
...
...
source/libs/function/src/tudf.c
浏览文件 @
90284027
...
...
@@ -594,7 +594,9 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
//TODO: free the array output->pDataBlock
output
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
taosArrayPush
(
output
->
pDataBlock
,
input
->
columnData
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
taosArrayPush
(
output
->
pDataBlock
,
(
input
+
i
)
->
columnData
);
}
return
0
;
}
...
...
source/libs/function/src/udfd.c
浏览文件 @
90284027
...
...
@@ -81,6 +81,9 @@ typedef struct SUdf {
TUdfAggStartFunc
aggStartFunc
;
TUdfAggProcessFunc
aggProcFunc
;
TUdfAggFinishFunc
aggFinishFunc
;
TUdfInitFunc
initFunc
;
TUdfDestroyFunc
destroyFunc
;
}
SUdf
;
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
...
...
@@ -101,7 +104,19 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
fnError
(
"can not load library %s. error: %s"
,
udf
->
path
,
uv_strerror
(
err
));
return
UDFC_CODE_LOAD_UDF_FAILURE
;
}
//TODO: init and destroy function
char
initFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
*
initSuffix
=
"_init"
;
strcpy
(
initFuncName
,
udfName
);
strncat
(
initFuncName
,
initSuffix
,
strlen
(
initSuffix
));
uv_dlsym
(
&
udf
->
lib
,
initFuncName
,
(
void
**
)(
&
udf
->
initFunc
));
char
destroyFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
*
destroySuffix
=
"_destroy"
;
strcpy
(
destroyFuncName
,
udfName
);
strncat
(
destroyFuncName
,
destroySuffix
,
strlen
(
destroySuffix
));
uv_dlsym
(
&
udf
->
lib
,
destroyFuncName
,
(
void
**
)(
&
udf
->
destroyFunc
));
if
(
udf
->
funcType
==
TSDB_FUNC_TYPE_SCALAR
)
{
char
processFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
strcpy
(
processFuncName
,
udfName
);
...
...
@@ -159,6 +174,9 @@ void udfdProcessRequest(uv_work_t *req) {
if
(
udf
->
state
==
UDF_STATE_INIT
)
{
udf
->
state
=
UDF_STATE_LOADING
;
udfdLoadUdf
(
setup
->
udfName
,
udf
);
if
(
udf
->
initFunc
)
{
udf
->
initFunc
();
}
udf
->
state
=
UDF_STATE_READY
;
uv_cond_broadcast
(
&
udf
->
condReady
);
uv_mutex_unlock
(
&
udf
->
lock
);
...
...
@@ -170,7 +188,6 @@ void udfdProcessRequest(uv_work_t *req) {
}
SUdfcFuncHandle
*
handle
=
taosMemoryMalloc
(
sizeof
(
SUdfcFuncHandle
));
handle
->
udf
=
udf
;
// TODO: allocate private structure and call init function and set it to handle
SUdfResponse
rsp
;
rsp
.
seqNum
=
request
.
seqNum
;
rsp
.
type
=
request
.
type
;
...
...
@@ -275,10 +292,12 @@ void udfdProcessRequest(uv_work_t *req) {
if
(
unloadUdf
)
{
uv_cond_destroy
(
&
udf
->
condReady
);
uv_mutex_destroy
(
&
udf
->
lock
);
if
(
udf
->
destroyFunc
)
{
(
udf
->
destroyFunc
)();
}
uv_dlclose
(
&
udf
->
lib
);
taosMemoryFree
(
udf
);
}
// TODO: call destroy and free udf private
taosMemoryFree
(
handle
);
SUdfResponse
response
;
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
90284027
...
...
@@ -1513,3 +1513,9 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
colDataAppendInt64
(
pOutput
->
columnData
,
pOutput
->
numOfRows
,
(
int64_t
*
)
colDataGetData
(
pInput
->
columnData
,
4
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qTbnameFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
ASSERT
(
inputNum
==
1
);
colDataAppend
(
pOutput
->
columnData
,
pOutput
->
numOfRows
,
colDataGetData
(
pInput
->
columnData
,
0
),
false
);
return
TSDB_CODE_SUCCESS
;
}
source/libs/scalar/src/sclvector.c
浏览文件 @
90284027
...
...
@@ -1023,8 +1023,7 @@ static void vectorMathMultiplyHelper(SColumnInfoData* pLeftCol, SColumnInfoData*
colDataAppendNULL
(
pOutputCol
,
i
);
continue
;
// TODO set null or ignore
}
*
output
=
getVectorDoubleValueFnLeft
(
LEFT_COL
,
i
)
*
getVectorDoubleValueFnRight
(
RIGHT_COL
,
0
);
*
output
=
getVectorDoubleValueFnLeft
(
LEFT_COL
,
i
)
*
getVectorDoubleValueFnRight
(
RIGHT_COL
,
0
);
}
}
}
...
...
@@ -1050,8 +1049,7 @@ void vectorMathMultiply(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam
colDataAppendNULL
(
pOutputCol
,
i
);
continue
;
// TODO set null or ignore
}
*
output
=
getVectorDoubleValueFnLeft
(
LEFT_COL
,
i
)
*
getVectorDoubleValueFnRight
(
RIGHT_COL
,
i
);
*
output
=
getVectorDoubleValueFnLeft
(
LEFT_COL
,
i
)
*
getVectorDoubleValueFnRight
(
RIGHT_COL
,
i
);
}
}
else
if
(
pLeft
->
numOfRows
==
1
)
{
vectorMathMultiplyHelper
(
pRightCol
,
pLeftCol
,
pOutputCol
,
pRight
->
numOfRows
,
step
,
i
);
...
...
tests/requirements.txt
0 → 100644
浏览文件 @
90284027
taospy
numpy
fabric2
psutil
pandas
tests/script/tsim/query/udf.sim
浏览文件 @
90284027
...
...
@@ -43,6 +43,27 @@ if $data00 != 2.236067977 then
return -1
endi
sql create table t2 (ts timestamp, f1 int, f2 int);
sql insert into t2 values(now, 0, 0)(now+1s, 1, 1);
sql select udf1(f1, f2) from t2;
if $rows != 2 then
return -1
endi
if $data00 != 88 then
return -1
endi
if $data10 != 88 then
return -1
endi
sql select udf2(f1, f2) from t2;
if $rows != 1 then
return -1
endi
if $data00 != 1.414213562 then
return -1
endi
#sql drop function udf1;
#sql drop function udf2;
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
tests/system-test/2-query/sqrt.py
0 → 100644
浏览文件 @
90284027
此差异已折叠。
点击以展开。
tests/system-test/fulltest.sh
浏览文件 @
90284027
...
...
@@ -24,3 +24,4 @@ python3 ./test.py -f 2-query/floor.py
python3 ./test.py
-f
2-query/round.py
python3 ./test.py
-f
2-query/log.py
python3 ./test.py
-f
2-query/pow.py
python3 ./test.py
-f
2-query/sqrt.py
taos-tools
@
59e0ebaf
比较
2f3dfddd
...
59e0ebaf
Subproject commit
2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c
Subproject commit
59e0ebaf4905e4cb6d95a01c58b3fa507abc5a20
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录