Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
800aa605
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
800aa605
编写于
5月 21, 2022
作者:
H
Haojun Liao
提交者:
GitHub
5月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12779 from taosdata/feature/3_liaohj
fix(query): set buffer page dirty when place data in it.
上级
e06a47bc
03c916b4
变更
8
显示空白变更内容
内联
并排
Showing
8 changed file
with
52 addition
and
183 deletion
+52
-183
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+3
-40
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+0
-1
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+7
-29
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+30
-88
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+2
-5
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+9
-16
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-1
source/util/src/tpagedbuf.c
source/util/src/tpagedbuf.c
+0
-3
未找到文件。
source/libs/executor/inc/executil.h
浏览文件 @
800aa605
...
...
@@ -42,11 +42,8 @@
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
typedef
struct
SGroupResInfo
{
int32_t
totalGroup
;
int32_t
currentGroup
;
int32_t
index
;
SArray
*
pRows
;
// SArray<SResultRowPosition*>
bool
ordered
;
SArray
*
pRows
;
// SArray<SResKeyPos>
int32_t
position
;
}
SGroupResInfo
;
...
...
@@ -80,18 +77,12 @@ typedef struct SResultRowInfo {
SResultRowPosition
cur
;
}
SResultRowInfo
;
struct
STaskAttr
;
struct
STaskRuntimeEnv
;
struct
SUdfInfo
;
struct
SqlFunctionCtx
;
int32_t
getOutputInterResultBufSize
(
struct
STaskAttr
*
pQueryAttr
);
size_t
getResultRowSize
(
struct
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
resetResultRowInfo
(
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
...
...
@@ -99,9 +90,7 @@ void initResultRow(SResultRow *pResultRow);
void
closeResultRow
(
SResultRow
*
pResultRow
);
bool
isResultRowClosed
(
SResultRow
*
pResultRow
);
struct
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
int32_t
*
offset
);
int32_t
getRowNumForMultioutput
(
struct
STaskAttr
*
pQueryAttr
,
bool
topBottomQuery
,
bool
stable
);
struct
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
const
int32_t
*
offset
);
static
FORCE_INLINE
SResultRow
*
getResultRow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
ASSERT
(
pResultRowInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pResultRowInfo
->
size
);
...
...
@@ -118,39 +107,13 @@ static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
return
pRow
;
}
static
FORCE_INLINE
char
*
getPosInResultPage
(
struct
STaskAttr
*
pQueryAttr
,
SFilePage
*
page
,
int32_t
rowOffset
,
int32_t
offset
)
{
assert
(
rowOffset
>=
0
&&
pQueryAttr
!=
NULL
);
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return
((
char
*
)
page
->
data
);
}
static
FORCE_INLINE
char
*
getPosInResultPage_rv
(
SFilePage
*
page
,
int32_t
rowOffset
,
int32_t
offset
)
{
assert
(
rowOffset
>=
0
);
int32_t
numOfRows
=
1
;
//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return
(
char
*
)
page
+
rowOffset
+
offset
*
numOfRows
;
}
typedef
struct
{
SArray
*
pResult
;
// SArray<SResPair>
int32_t
colId
;
}
SStddevInterResult
;
void
initGroupedResultInfo
(
SGroupResInfo
*
pGroupResInfo
,
SHashObj
*
pHashmap
,
int32_t
order
);
void
initMultiResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
);
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
);
bool
hasRemainDataInCurrentGroup
(
SGroupResInfo
*
pGroupResInfo
);
bool
hasRemainData
(
SGroupResInfo
*
pGroupResInfo
);
bool
hashRemainDataInGroupInfo
(
SGroupResInfo
*
pGroupResInfo
);
bool
incNextGroup
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
getNumOfTotalRes
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
mergeIntoGroupResult
(
SGroupResInfo
*
pGroupResInfo
,
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
int32_t
*
offset
);
//int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorimpl.h
浏览文件 @
800aa605
...
...
@@ -666,7 +666,6 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
void
initResultSizeInfo
(
SOperatorInfo
*
pOperator
,
int32_t
numOfRows
);
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
void
finalizeMultiTupleQueryResult
(
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
);
void
doApplyFunctions
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
);
int32_t
setGroupResultOutputBuf
(
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
,
...
...
source/libs/executor/src/executil.c
浏览文件 @
800aa605
...
...
@@ -114,12 +114,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert
(
pResultRowInfo
->
size
>=
0
&&
pResultRowInfo
->
capacity
>=
pResultRowInfo
->
size
);
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
// SResultRow* pRow = pResultRowInfo->pResult[i];
// if (pRow->closed) {
// continue;
// }
// pRow->closed = true;
}
}
...
...
@@ -144,11 +138,11 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
for
(
int32_t
i
=
0
;
i
<
pRuntimeEnv
->
pQueryAttr
->
numOfOutput
;
++
i
)
{
struct
SResultRowEntryInfo
*
pEntryInfo
=
NULL
;
//pResultRow->pEntryInfo[i];
int16_t
size
=
pRuntimeEnv
->
pQueryAttr
->
pExpr1
[
i
].
base
.
resSchema
.
bytes
;
char
*
s
=
getPosInResultPage
(
pRuntimeEnv
->
pQueryAttr
,
page
,
pResultRow
->
offset
,
offset
);
memset
(
s
,
0
,
size
);
//
int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resSchema.bytes;
//
char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset);
//
memset(s, 0, size);
offset
+=
size
;
//
offset += size;
cleanupResultRowEntry
(
pEntryInfo
);
}
}
...
...
@@ -161,7 +155,7 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
}
// TODO refactor: use macro
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
int32_t
*
offset
)
{
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
const
int32_t
*
offset
)
{
assert
(
index
>=
0
&&
offset
!=
NULL
);
return
(
SResultRowEntryInfo
*
)((
char
*
)
pRow
->
pEntryInfo
+
offset
[
index
]);
}
...
...
@@ -247,7 +241,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
ASSERT
(
pGroupResInfo
->
index
<=
getNumOfTotalRes
(
pGroupResInfo
));
}
bool
has
RemainDataInCurrentGroup
(
SGroupResInfo
*
pGroupResInfo
)
{
bool
has
hRemainDataInGroupInfo
(
SGroupResInfo
*
pGroupResInfo
)
{
if
(
pGroupResInfo
->
pRows
==
NULL
)
{
return
false
;
}
...
...
@@ -255,18 +249,6 @@ bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) {
return
pGroupResInfo
->
index
<
taosArrayGetSize
(
pGroupResInfo
->
pRows
);
}
bool
hasRemainData
(
SGroupResInfo
*
pGroupResInfo
)
{
if
(
hasRemainDataInCurrentGroup
(
pGroupResInfo
))
{
return
true
;
}
return
pGroupResInfo
->
currentGroup
<
pGroupResInfo
->
totalGroup
;
}
bool
incNextGroup
(
SGroupResInfo
*
pGroupResInfo
)
{
return
(
++
pGroupResInfo
->
currentGroup
)
<
pGroupResInfo
->
totalGroup
;
}
int32_t
getNumOfTotalRes
(
SGroupResInfo
*
pGroupResInfo
)
{
assert
(
pGroupResInfo
!=
NULL
);
if
(
pGroupResInfo
->
pRows
==
0
)
{
...
...
@@ -387,11 +369,6 @@ void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) {
}
static
int32_t
mergeIntoGroupResultImplRv
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
uint64_t
groupId
,
int32_t
*
rowCellInfoOffset
)
{
if
(
!
pGroupResInfo
->
ordered
)
{
orderTheResultRows
(
pRuntimeEnv
);
pGroupResInfo
->
ordered
=
true
;
}
if
(
pGroupResInfo
->
pRows
==
NULL
)
{
pGroupResInfo
->
pRows
=
taosArrayInit
(
100
,
POINTER_BYTES
);
}
...
...
@@ -403,6 +380,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe
break
;
}
int64_t
num
=
getNumOfResultWindowRes
(
pRuntimeEnv
,
&
pResultRowCell
->
pos
,
rowCellInfoOffset
);
if
(
num
<=
0
)
{
continue
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
800aa605
...
...
@@ -156,7 +156,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
)
{}
static
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
);
const
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
);
static
void
initCtxOutputBuffer
(
SqlFunctionCtx
*
pCtx
,
int32_t
size
);
static
void
setResultBufSize
(
STaskAttr
*
pQueryAttr
,
SResultInfo
*
pResultInfo
);
...
...
@@ -182,10 +182,11 @@ static int compareRowData(const void* a, const void* b, const void* userData) {
SFilePage
*
page2
=
getBufPage
(
pRuntimeEnv
->
pResultBuf
,
pRow2
->
pageId
);
int16_t
offset
=
supporter
->
dataOffset
;
char
*
in1
=
getPosInResultPage
(
pRuntimeEnv
->
pQueryAttr
,
page1
,
pRow1
->
offset
,
offset
);
char
*
in2
=
getPosInResultPage
(
pRuntimeEnv
->
pQueryAttr
,
page2
,
pRow2
->
offset
,
offset
);
return
0
;
// char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset);
// char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset);
return
(
in1
!=
NULL
&&
in2
!=
NULL
)
?
supporter
->
comFunc
(
in1
,
in2
)
:
0
;
//
return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
}
// setup the output buffer for each operator
...
...
@@ -333,6 +334,8 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
return
NULL
;
}
setBufPageDirty
(
pData
,
true
);
// set the number of rows in current disk page
SResultRow
*
pResultRow
=
(
SResultRow
*
)((
char
*
)
pData
+
pData
->
num
);
pResultRow
->
pageId
=
pageId
;
...
...
@@ -364,12 +367,14 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if
(
isIntervalQuery
)
{
if
(
masterscan
&&
p1
!=
NULL
)
{
// the *p1 may be NULL in case of sliding+offset exists.
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
);
ASSERT
(
pResult
->
pageId
==
p1
->
pageId
&&
pResult
->
offset
==
p1
->
offset
);
}
}
else
{
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
// pResultRowInfo object.
if
(
p1
!=
NULL
)
{
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
);
ASSERT
(
pResult
->
pageId
==
p1
->
pageId
&&
pResult
->
offset
==
p1
->
offset
);
}
}
...
...
@@ -1417,21 +1422,6 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
// }
//}
static
void
getIntermediateBufInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
int32_t
*
ps
,
int32_t
*
rowsize
)
{
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
int32_t
MIN_ROWS_PER_PAGE
=
4
;
*
rowsize
=
(
int32_t
)(
pQueryAttr
->
resultRowSize
*
getRowNumForMultioutput
(
pQueryAttr
,
pQueryAttr
->
topBotQuery
,
pQueryAttr
->
stableQuery
));
int32_t
overhead
=
sizeof
(
SFilePage
);
// one page contains at least two rows
*
ps
=
DEFAULT_INTERN_BUF_PAGE_SIZE
;
while
(((
*
rowsize
)
*
MIN_ROWS_PER_PAGE
)
>
(
*
ps
)
-
overhead
)
{
*
ps
=
((
*
ps
)
<<
1u
);
}
}
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...
...
@@ -1708,36 +1698,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
return
TSDB_CODE_SUCCESS
;
}
void
copyToSDataBlock
(
SSDataBlock
*
pBlock
,
int32_t
*
offset
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pResBuf
)
{
pBlock
->
info
.
rows
=
0
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
while
(
pGroupResInfo
->
currentGroup
<
pGroupResInfo
->
totalGroup
)
{
// all results in current group have been returned to client, try next group
if
((
pGroupResInfo
->
pRows
==
NULL
)
||
taosArrayGetSize
(
pGroupResInfo
->
pRows
)
==
0
)
{
assert
(
pGroupResInfo
->
index
==
0
);
// if ((code = mergeIntoGroupResult(&pGroupResInfo, pRuntimeEnv, offset)) != TSDB_CODE_SUCCESS) {
return
;
// }
}
// doCopyToSDataBlock(pResBuf, pGroupResInfo, TSDB_ORDER_ASC, pBlock, );
// current data are all dumped to result buffer, clear it
if
(
!
hasRemainDataInCurrentGroup
(
pGroupResInfo
))
{
cleanupGroupResInfo
(
pGroupResInfo
);
if
(
!
incNextGroup
(
pGroupResInfo
))
{
break
;
}
}
// enough results in data buffer, return
// if (pBlock->info.rows >= threshold) {
// break;
// }
}
}
static
void
updateTableQueryInfoForReverseScan
(
STableQueryInfo
*
pTableQueryInfo
)
{
if
(
pTableQueryInfo
==
NULL
)
{
return
;
...
...
@@ -1884,35 +1844,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
}
}
// todo merged with the build group result.
void
finalizeMultiTupleQueryResult
(
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
)
{
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
SResultRowPosition
*
pPos
=
&
pResultRowInfo
->
pPosition
[
i
];
SFilePage
*
bufPage
=
getBufPage
(
pBuf
,
pPos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pPos
->
offset
);
// TODO ignore the close status anyway.
// if (!isResultRowClosed(pRow)) {
// continue;
// }
for
(
int32_t
j
=
0
;
j
<
numOfOutput
;
++
j
)
{
struct
SResultRowEntryInfo
*
pResInfo
=
getResultCell
(
pRow
,
j
,
rowCellInfoOffset
);
if
(
!
isRowEntryInitialized
(
pResInfo
))
{
continue
;
}
if
(
pRow
->
numOfRows
<
pResInfo
->
numOfRes
)
{
pRow
->
numOfRows
=
pResInfo
->
numOfRes
;
}
}
releaseBufPage
(
pBuf
,
bufPage
);
}
}
STableQueryInfo
*
createTableQueryInfo
(
void
*
buf
,
STimeWindow
win
)
{
STableQueryInfo
*
pTableQueryInfo
=
buf
;
pTableQueryInfo
->
lastKey
=
win
.
skey
;
...
...
@@ -2062,20 +1993,34 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
pAggInfo
->
groupId
=
groupId
;
}
static
void
doUpdateNumOfRows
(
SResultRow
*
pRow
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
struct
SResultRowEntryInfo
*
pResInfo
=
getResultCell
(
pRow
,
j
,
rowCellOffset
);
if
(
!
isRowEntryInitialized
(
pResInfo
))
{
continue
;
}
if
(
pRow
->
numOfRows
<
pResInfo
->
numOfRes
)
{
pRow
->
numOfRows
=
pResInfo
->
numOfRes
;
}
}
}
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
pTaskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
const
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
int32_t
start
=
pGroupResInfo
->
index
;
// qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv));
for
(
int32_t
i
=
start
;
i
<
numOfRows
;
i
+=
1
)
{
SResKeyPos
*
pPos
=
taosArrayGetP
(
pGroupResInfo
->
pRows
,
i
);
SFilePage
*
page
=
getBufPage
(
pBuf
,
pPos
->
pos
.
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
page
+
pPos
->
pos
.
offset
);
doUpdateNumOfRows
(
pRow
,
numOfExprs
,
rowCellOffset
);
if
(
pRow
->
numOfRows
==
0
)
{
pGroupResInfo
->
index
+=
1
;
releaseBufPage
(
pBuf
,
page
);
continue
;
}
...
...
@@ -2084,11 +2029,13 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
}
else
{
// current value belongs to different group, it can't be packed into one datablock
if
(
pBlock
->
info
.
groupId
!=
pPos
->
groupId
)
{
releaseBufPage
(
pBuf
,
page
);
break
;
}
}
if
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
releaseBufPage
(
pBuf
,
page
);
break
;
}
...
...
@@ -2130,8 +2077,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
}
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
)
{
assert
(
pGroupResInfo
->
currentGroup
<=
pGroupResInfo
->
totalGroup
);
SExprInfo
*
pExprInfo
=
pOperator
->
pExpr
;
int32_t
numOfExprs
=
pOperator
->
numOfExprs
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -2141,7 +2086,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
SqlFunctionCtx
*
pCtx
=
pbInfo
->
pCtx
;
blockDataCleanup
(
pBlock
);
if
(
!
has
RemainDataInCurrentGroup
(
pGroupResInfo
))
{
if
(
!
has
hRemainDataInGroupInfo
(
pGroupResInfo
))
{
return
;
}
...
...
@@ -3627,9 +3572,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
closeAllResultRows
(
&
pAggInfo
->
binfo
.
resultRowInfo
);
finalizeMultiTupleQueryResult
(
pOperator
->
numOfExprs
,
pAggInfo
->
aggSup
.
pResultBuf
,
&
pAggInfo
->
binfo
.
resultRowInfo
,
pAggInfo
->
binfo
.
rowCellInfoOffset
);
initGroupedResultInfo
(
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultRowHashTable
,
0
);
OPTR_SET_OPENED
(
pOperator
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -3651,7 +3593,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pOperator
,
pInfo
,
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pAggInfo
->
groupResInfo
))
{
if
(
pInfo
->
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pAggInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
800aa605
...
...
@@ -269,7 +269,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
if
(
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
(
pRes
->
info
.
rows
==
0
)
?
NULL
:
pRes
;
...
...
@@ -303,9 +303,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
finalizeMultiTupleQueryResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
binfo
.
resultRowInfo
,
pInfo
->
binfo
.
rowCellInfoOffset
);
// if (!stableQuery) { // finalize include the update of result rows
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
// } else {
...
...
@@ -320,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pRes
,
NULL
);
bool
hasRemain
=
has
RemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
);
bool
hasRemain
=
has
hRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
break
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
800aa605
...
...
@@ -815,9 +815,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
finalizeMultiTupleQueryResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
binfo
.
resultRowInfo
,
pInfo
->
binfo
.
rowCellInfoOffset
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
order
);
OPTR_SET_OPENED
(
pOperator
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -933,7 +930,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -960,13 +957,11 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
finalizeMultiTupleQueryResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pBInfo
->
resultRowInfo
,
pBInfo
->
rowCellInfoOffset
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
...
...
@@ -994,7 +989,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBlock
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBlock
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
if
(
pBlock
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
...
...
@@ -1082,7 +1077,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
binfo
.
pRes
;
...
...
@@ -1413,7 +1408,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -1440,13 +1435,11 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
finalizeMultiTupleQueryResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pBInfo
->
resultRowInfo
,
pBInfo
->
rowCellInfoOffset
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
...
...
@@ -1461,7 +1454,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
STimeSliceOperatorInfo
*
pSliceInfo
=
pOperator
->
info
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if
(
pSliceInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pSliceInfo
->
groupResInfo
))
{
if
(
pSliceInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pSliceInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
...
...
@@ -1493,7 +1486,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
if
(
pSliceInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pSliceInfo
->
groupResInfo
))
{
if
(
pSliceInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pSliceInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
...
...
@@ -1695,7 +1688,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return
NULL
;
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
has
RemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
has
hRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
binfo
.
pRes
;
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
800aa605
...
...
@@ -543,7 +543,7 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
}
bool
getAvgFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
double
);
pEnv
->
calcMemSize
=
sizeof
(
SAvgRes
);
return
true
;
}
...
...
source/util/src/tpagedbuf.c
浏览文件 @
800aa605
...
...
@@ -187,9 +187,6 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
}
}
else
{
// NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
size
=
pg
->
length
;
if
(
size
==
-
1
)
{
printf
(
"----
\n
"
);
}
}
ASSERT
(
size
>
0
||
(
pg
->
offset
==
-
1
&&
pg
->
length
==
-
1
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录