Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f2de1af9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f2de1af9
编写于
6月 15, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(query): partition by support arithmetic expr, and do some internal refactor.
上级
4f8c84cf
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
59 addition
and
256 deletion
+59
-256
include/libs/function/function.h
include/libs/function/function.h
+0
-4
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+0
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+15
-11
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+0
-11
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-132
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+35
-14
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+8
-3
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+0
-5
source/libs/function/inc/texpr.h
source/libs/function/inc/texpr.h
+0
-67
source/libs/function/src/texpr.c
source/libs/function/src/texpr.c
+0
-8
未找到文件。
include/libs/function/function.h
浏览文件 @
f2de1af9
...
...
@@ -150,12 +150,8 @@ typedef struct SqlFunctionCtx {
}
SqlFunctionCtx
;
enum
{
TEXPR_NODE_DUMMY
=
0x0
,
TEXPR_BINARYEXPR_NODE
=
0x1
,
TEXPR_UNARYEXPR_NODE
=
0x2
,
TEXPR_FUNCTION_NODE
=
0x3
,
TEXPR_COL_NODE
=
0x4
,
TEXPR_VALUE_NODE
=
0x8
,
};
typedef
struct
tExprNode
{
...
...
source/libs/executor/inc/executil.h
浏览文件 @
f2de1af9
...
...
@@ -55,7 +55,6 @@ typedef struct SResultRow {
uint32_t
numOfRows
;
// number of rows of current time window
STimeWindow
win
;
struct
SResultRowEntryInfo
pEntryInfo
[];
// For each result column, there is a resultInfo
// char *key; // start key of current result row
}
SResultRow
;
typedef
struct
SResultRowPosition
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
f2de1af9
...
...
@@ -540,6 +540,13 @@ typedef struct SFillOperatorInfo {
bool
multigroupResult
;
}
SFillOperatorInfo
;
typedef
struct
SScalarSupp
{
SExprInfo
*
pScalarExprInfo
;
int32_t
numOfScalarExpr
;
// the number of scalar expression in group operator
SqlFunctionCtx
*
pScalarFuncCtx
;
int32_t
*
rowCellInfoOffset
;
// offset value for each row result cell info
}
SScalarSupp
;
typedef
struct
SGroupbyOperatorInfo
{
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
...
...
@@ -552,10 +559,7 @@ typedef struct SGroupbyOperatorInfo {
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SGroupResInfo
groupResInfo
;
SExprInfo
*
pScalarExprInfo
;
int32_t
numOfScalarExpr
;
// the number of scalar expression in group operator
SqlFunctionCtx
*
pScalarFuncCtx
;
int32_t
*
rowCellInfoOffset
;
// offset value for each row result cell info
SScalarSupp
scalarSup
;
}
SGroupbyOperatorInfo
;
typedef
struct
SDataGroupInfo
{
...
...
@@ -578,6 +582,7 @@ typedef struct SPartitionOperatorInfo {
int32_t
*
columnOffset
;
// start position for each column data
void
*
pGroupIter
;
// group iterator
int32_t
pageIndex
;
// page index of current group
SScalarSupp
scalarSupp
;
}
SPartitionOperatorInfo
;
typedef
struct
SWindowRowsSup
{
...
...
@@ -755,6 +760,8 @@ void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
);
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
SqlFunctionCtx
*
createSqlFunctionCtx
(
SExprInfo
*
pExprInfo
,
int32_t
numOfOutput
,
int32_t
**
rowCellInfoOffset
);
...
...
@@ -834,20 +841,17 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
STimeWindowAggSupp
*
pTwAggSupp
,
int32_t
tsSlotId
,
SColumn
*
pStateKeyCol
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
const
SNodeListNode
*
pValNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SNode
*
pOnCondition
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
int32_t
tsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
int32_t
tsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamStateAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamStateAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
...
...
source/libs/executor/src/executil.c
浏览文件 @
f2de1af9
...
...
@@ -28,17 +28,6 @@ typedef struct SCompSupporter {
int32_t
order
;
}
SCompSupporter
;
int32_t
getOutputInterResultBufSize
(
STaskAttr
*
pQueryAttr
)
{
int32_t
size
=
0
;
for
(
int32_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
// size += pQueryAttr->pExpr1[i].base.interBytes;
}
assert
(
size
>=
0
);
return
size
;
}
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
)
{
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
cur
.
pageId
=
-
1
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
f2de1af9
...
...
@@ -40,9 +40,6 @@
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define SDATA_BLOCK_INITIALIZER \
(SDataBlockInfo) { {0}, 0 }
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#if 0
...
...
@@ -95,8 +92,6 @@ static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlo
static
void
destroyTableQueryInfoImpl
(
STableQueryInfo
*
pTableQueryInfo
);
static
SColumnInfo
*
extractColumnFilterInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
*
numOfFilterCols
);
static
void
releaseQueryBuf
(
size_t
numOfTables
);
static
void
destroySFillOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
...
...
@@ -454,75 +449,6 @@ static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo
return
chkResultRowFromKey
(
pRuntimeEnv
,
pResultRowInfo
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
,
masterscan
,
groupId
);
}
static
void
doUpdateResultRowIndex
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
bool
ascQuery
,
bool
timeWindowInterpo
)
{
int64_t
skey
=
TSKEY_INITIAL_VAL
;
#if 0
int32_t i = 0;
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow* pResult = pResultRowInfo->pResult[i];
if (pResult->closed) {
break;
}
// new closed result rows
if (timeWindowInterpo) {
if (pResult->endInterp &&
((pResult->win.skey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery))) {
if (i > 0) { // the first time window, the startInterp is false.
assert(pResult->startInterp);
}
closeResultRow(pResultRowInfo, i);
} else {
skey = pResult->win.skey;
}
} else {
if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
closeResultRow(pResultRowInfo, i);
} else {
skey = pResult->win.skey;
}
}
}
// all result rows are closed, set the last one to be the skey
if (skey == TSKEY_INITIAL_VAL) {
if (pResultRowInfo->size == 0) {
// assert(pResultRowInfo->current == NULL);
assert(pResultRowInfo->curPos == -1);
pResultRowInfo->curPos = -1;
} else {
pResultRowInfo->curPos = pResultRowInfo->size - 1;
}
} else {
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow* pResult = pResultRowInfo->pResult[i];
if (pResult->closed) {
break;
}
}
if (i == pResultRowInfo->size - 1) {
pResultRowInfo->curPos = i;
} else {
pResultRowInfo->curPos = i + 1; // current not closed result object
}
}
#endif
}
//
// static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey,
// bool ascQuery, bool interp) {
// if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) {
// closeAllResultRows(pResultRowInfo);
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
// } else {
// int32_t step = ascQuery ? 1 : -1;
// doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp);
// }
//}
// query_range_start, query_range_end, window_duration, window_start, window_end
void
initExecTimeWindowInfo
(
SColumnInfoData
*
pColData
,
STimeWindow
*
pQueryWindow
)
{
pColData
->
info
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
...
...
@@ -810,20 +736,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
// _rowts/_c0, not tbname column
if
(
fmIsPseudoColumnFunc
(
pfCtx
->
functionId
)
&&
(
!
fmIsScanPseudoColumnFunc
(
pfCtx
->
functionId
)))
{
// do nothing
}
else
if
(
fmIsIndefiniteRowsFunc
(
pfCtx
->
functionId
))
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
k
]);
pfCtx
->
fpSet
.
init
(
&
pCtx
[
k
],
pResInfo
);
pfCtx
->
pOutput
=
taosArrayGet
(
pResult
->
pDataBlock
,
outputSlotId
);
pfCtx
->
offset
=
createNewColModel
?
0
:
pResult
->
info
.
rows
;
// set the start offset
// set the timestamp(_rowts) output buffer
if
(
taosArrayGetSize
(
pPseudoList
)
>
0
)
{
int32_t
*
outputColIndex
=
taosArrayGet
(
pPseudoList
,
0
);
pfCtx
->
pTsOutput
=
(
SColumnInfoData
*
)
pCtx
[
*
outputColIndex
].
pOutput
;
}
numOfRows
=
pfCtx
->
fpSet
.
process
(
pfCtx
);
}
else
{
SArray
*
pBlockList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
taosArrayPush
(
pBlockList
,
&
pSrcBlock
);
...
...
@@ -886,7 +798,6 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
doSetResultOutBufByKey
(
pBuf
,
pResultRowInfo
,
(
char
*
)
pData
,
bytes
,
true
,
groupId
,
pTaskInfo
,
false
,
pAggSup
);
assert
(
pResultRow
!=
NULL
);
setResultRowKey
(
pResultRow
,
pData
,
type
);
setResultRowInitCtx
(
pResultRow
,
pCtx
,
numOfCols
,
binfo
->
rowCellInfoOffset
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -908,21 +819,6 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return
false
;
}
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
// }
//
// // denote the order type
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// // return pCtx->param[0].i == pQueryAttr->order.order;
// }
// in the reverse table scan, only the following functions need to be executed
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
// (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) {
// return false;
// }
return
true
;
}
...
...
@@ -3934,27 +3830,6 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
}
}
// static STableQueryInfo* initTableQueryInfo(const STableListInfo* pTableListInfo) {
// int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
// if (size == 0) {
// return NULL;
// }
//
// STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(size, sizeof(STableQueryInfo));
// if (pTableQueryInfo == NULL) {
// return NULL;
// }
//
// for (int32_t j = 0; j < size; ++j) {
// STableKeyInfo* pk = taosArrayGet(pTableListInfo->pTableList, j);
// STableQueryInfo* pTQueryInfo = &pTableQueryInfo[j];
// pTQueryInfo->lastKey = pk->lastKey;
// }
//
// pTableQueryInfo->lastKey = 0;
// return pTableQueryInfo;
//}
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
@@ -4526,7 +4401,6 @@ static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableU
static
SArray
*
extractColumnInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
int32_t
extractTableSchemaVersion
(
SReadHandle
*
pHandle
,
uint64_t
uid
,
SExecTaskInfo
*
pTaskInfo
)
{
SMetaReader
mr
=
{
0
};
...
...
@@ -4861,12 +4735,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_PARTITION
==
type
)
{
SPartitionPhysiNode
*
pPartNode
=
(
SPartitionPhysiNode
*
)
pPhyNode
;
SArray
*
pColList
=
extractPartitionColInfo
(
pPartNode
->
pPartitionKeys
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPartNode
->
pTargets
,
NULL
,
&
num
);
pOptr
=
createPartitionOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
pColList
,
pTaskInfo
);
pOptr
=
createPartitionOperatorInfo
(
ops
[
0
],
(
SPartitionPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE
==
type
)
{
SStateWinodwPhysiNode
*
pStateNode
=
(
SStateWinodwPhysiNode
*
)
pPhyNode
;
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
f2de1af9
...
...
@@ -320,8 +320,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
scanFlag
,
true
);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
pScalarExprInfo
!=
NULL
)
{
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
pScalarExprInfo
,
pBlock
,
pBlock
,
pInfo
->
pScalarFuncCtx
,
pInfo
->
numOfScalarExpr
,
NULL
);
if
(
pInfo
->
scalarSup
.
pScalarExprInfo
!=
NULL
)
{
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
scalarSup
.
pScalarExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pScalarFuncCtx
,
pInfo
->
scalarSup
.
numOfScalarExpr
,
NULL
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
...
...
@@ -385,9 +385,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo
->
pGroupCols
=
pGroupColList
;
pInfo
->
pCondition
=
pCondition
;
pInfo
->
pScalarExprInfo
=
pScalarExprInfo
;
pInfo
->
numOfScalarExpr
=
numOfScalarExpr
;
pInfo
->
pScalarFuncCtx
=
createSqlFunctionCtx
(
pScalarExprInfo
,
numOfScalarExpr
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
scalarSup
.
pScalarExprInfo
=
pScalarExprInfo
;
pInfo
->
scalarSup
.
numOfScalarExpr
=
numOfScalarExpr
;
pInfo
->
scalarSup
.
pScalarFuncCtx
=
createSqlFunctionCtx
(
pScalarExprInfo
,
numOfScalarExpr
,
&
pInfo
->
scalarSup
.
rowCellInfoOffset
);
int32_t
code
=
initGroupOptrInfo
(
&
pInfo
->
pGroupColVals
,
&
pInfo
->
groupKeyLen
,
&
pInfo
->
keyBuf
,
pGroupColList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -624,7 +624,9 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
return
NULL
;
}
SGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
...
...
@@ -641,6 +643,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
break
;
}
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
scalarSupp
.
pScalarExprInfo
!=
NULL
)
{
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
scalarSupp
.
pScalarExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSupp
.
pScalarFuncCtx
,
pInfo
->
scalarSupp
.
numOfScalarExpr
,
NULL
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
}
doHashPartition
(
pOperator
,
pBlock
);
}
...
...
@@ -665,15 +675,26 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree
(
pInfo
->
columnOffset
);
}
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SPartitionOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SPartitionOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
pGroupCols
=
pGroupColList
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPartNode
->
node
.
pOutputDataBlockDesc
);
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPartNode
->
pTargets
,
NULL
,
&
numOfCols
);
pInfo
->
pGroupCols
=
extractPartitionColInfo
(
pPartNode
->
pPartitionKeys
);
if
(
pPartNode
->
pExprs
!=
NULL
)
{
pInfo
->
scalarSupp
.
numOfScalarExpr
=
0
;
pInfo
->
scalarSupp
.
pScalarExprInfo
=
createExprInfo
(
pPartNode
->
pExprs
,
NULL
,
&
pInfo
->
scalarSupp
.
numOfScalarExpr
);
pInfo
->
scalarSupp
.
pScalarFuncCtx
=
createSqlFunctionCtx
(
pInfo
->
scalarSupp
.
pScalarExprInfo
,
pInfo
->
scalarSupp
.
numOfScalarExpr
,
&
pInfo
->
scalarSupp
.
rowCellInfoOffset
);
}
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pInfo
->
pGroupSet
=
taosHashInit
(
100
,
hashFn
,
false
,
HASH_NO_LOCK
);
...
...
@@ -683,16 +704,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
uint32_t
defaultPgsz
=
0
;
uint32_t
defaultBufsz
=
0
;
getBufferPgSize
(
pRes
ult
Block
->
info
.
rowSize
,
&
defaultPgsz
,
&
defaultBufsz
);
getBufferPgSize
(
pResBlock
->
info
.
rowSize
,
&
defaultPgsz
,
&
defaultBufsz
);
int32_t
code
=
createDiskbasedBuf
(
&
pInfo
->
pBuf
,
defaultPgsz
,
defaultBufsz
,
pTaskInfo
->
id
.
str
,
TD_TMP_DIR_PATH
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
rowCapacity
=
blockDataGetCapacityInRow
(
pRes
ult
Block
,
getBufPageSize
(
pInfo
->
pBuf
));
pInfo
->
columnOffset
=
setupColumnOffset
(
pRes
ult
Block
,
pInfo
->
rowCapacity
);
code
=
initGroupOptrInfo
(
&
pInfo
->
pGroupColVals
,
&
pInfo
->
groupKeyLen
,
&
pInfo
->
keyBuf
,
p
GroupColList
);
pInfo
->
rowCapacity
=
blockDataGetCapacityInRow
(
pResBlock
,
getBufPageSize
(
pInfo
->
pBuf
));
pInfo
->
columnOffset
=
setupColumnOffset
(
pResBlock
,
pInfo
->
rowCapacity
);
code
=
initGroupOptrInfo
(
&
pInfo
->
pGroupColVals
,
&
pInfo
->
groupKeyLen
,
&
pInfo
->
keyBuf
,
p
Info
->
pGroupCols
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -701,7 +722,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PARTITION
;
pInfo
->
binfo
.
pRes
=
pRes
ult
Block
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
info
=
pInfo
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
f2de1af9
...
...
@@ -1157,6 +1157,8 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
);
#if 0
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
...
...
@@ -1202,6 +1204,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
px->info.rows = numOfRow;
pInfo->pRes = px;
#endif
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
...
...
@@ -1380,6 +1383,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
p
->
pDataBlock
);
doFilterResult
(
pInfo
);
blockDataDestroy
(
p
);
pInfo
->
loadInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
return
(
pInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
...
...
@@ -1469,10 +1474,10 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
p
->
info
.
rows
=
buildDbTableInfoBlock
(
p
,
pSysDbTableMeta
,
size
,
TSDB_PERFORMANCE_SCHEMA_DB
);
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
p
->
pDataBlock
);
// blockDataDestroy(p); todo handle memory leak
pInfo
->
pRes
->
info
.
rows
=
p
->
info
.
rows
;
return
p
->
info
.
rows
;
blockDataDestroy
(
p
);
return
pInfo
->
pRes
->
info
.
rows
;
}
int32_t
buildDbTableInfoBlock
(
const
SSDataBlock
*
p
,
const
SSysTableMeta
*
pSysDbTableMeta
,
size_t
size
,
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
f2de1af9
...
...
@@ -333,11 +333,6 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
continue
;
}
// if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) {
// pCtx[k].start.key = INT64_MIN;
// continue;
// }
SFunctParam
*
pParam
=
&
pCtx
[
k
].
param
[
0
];
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pDataBlock
,
pParam
->
pCol
->
slotId
);
...
...
source/libs/function/inc/texpr.h
已删除
100644 → 0
浏览文件 @
4f8c84cf
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_EXPR_H_
#define _TD_COMMON_EXPR_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "os.h"
#include "tmsg.h"
#include "taosdef.h"
#include "tskiplist.h"
#include "function.h"
struct
tExprNode
;
struct
SSchema
;
#define QUERY_COND_REL_PREFIX_IN "IN|"
#define QUERY_COND_REL_PREFIX_LIKE "LIKE|"
#define QUERY_COND_REL_PREFIX_MATCH "MATCH|"
#define QUERY_COND_REL_PREFIX_NMATCH "NMATCH|"
#define QUERY_COND_REL_PREFIX_IN_LEN 3
#define QUERY_COND_REL_PREFIX_LIKE_LEN 5
#define QUERY_COND_REL_PREFIX_MATCH_LEN 6
#define QUERY_COND_REL_PREFIX_NMATCH_LEN 7
typedef
bool
(
*
__result_filter_fn_t
)(
const
void
*
,
void
*
);
typedef
void
(
*
__do_filter_suppl_fn_t
)(
void
*
,
void
*
);
/**
* this structure is used to filter data in tags, so the offset of filtered tag column in tagdata string is required
*/
typedef
struct
tQueryInfo
{
uint8_t
optr
;
// expression operator
SSchema
sch
;
// schema of tags
char
*
q
;
__compar_fn_t
compare
;
// filter function
bool
indexed
;
// indexed columns
}
tQueryInfo
;
typedef
struct
SExprTraverseSupp
{
__result_filter_fn_t
nodeFilterFn
;
__do_filter_suppl_fn_t
setupInfoFn
;
void
*
pExtInfo
;
}
SExprTraverseSupp
;
#ifdef __cplusplus
}
#endif
#endif
/*_TD_COMMON_EXPR_H_*/
source/libs/function/src/texpr.c
浏览文件 @
f2de1af9
...
...
@@ -17,15 +17,7 @@
#include "os.h"
#include "texception.h"
#include "taosdef.h"
#include "tmsg.h"
#include "tarray.h"
#include "tbuffer.h"
#include "tcompare.h"
#include "thash.h"
#include "texpr.h"
#include "tvariant.h"
#include "tdef.h"
static
void
doExprTreeDestroy
(
tExprNode
**
pExpr
,
void
(
*
fp
)(
void
*
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录