Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
47e885da
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
47e885da
编写于
1月 03, 2023
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: evac page failed issue cause of disk full
上级
d33bcc99
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
86 addition
and
6 deletion
+86
-6
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+4
-0
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+4
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+35
-1
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+16
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+4
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+14
-0
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+9
-0
未找到文件。
source/libs/executor/inc/executil.h
浏览文件 @
47e885da
...
...
@@ -115,6 +115,10 @@ struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t i
static
FORCE_INLINE
SResultRow
*
getResultRowByPos
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
pos
,
bool
forUpdate
)
{
SFilePage
*
bufPage
=
(
SFilePage
*
)
getBufPage
(
pBuf
,
pos
->
pageId
);
if
(
NULL
==
bufPage
)
{
return
NULL
;
}
if
(
forUpdate
)
{
setBufPageDirty
(
bufPage
,
true
);
}
...
...
source/libs/executor/src/executil.c
浏览文件 @
47e885da
...
...
@@ -1726,8 +1726,10 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
return
w
;
}
w
=
getResultRowByPos
(
pBuf
,
&
pResultRowInfo
->
cur
,
false
)
->
win
;
SResultRow
*
pRow
=
getResultRowByPos
(
pBuf
,
&
pResultRowInfo
->
cur
,
false
);
if
(
pRow
)
{
w
=
pRow
->
win
;
}
// in case of typical time window, we can calculate time window directly.
if
(
w
.
skey
>
ts
||
w
.
ekey
<
ts
)
{
w
=
doCalculateTimeWindow
(
ts
,
pInterval
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
47e885da
...
...
@@ -150,6 +150,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
pData
->
num
=
sizeof
(
SFilePage
);
}
else
{
pData
=
getBufPage
(
pResultBuf
,
*
currentPageId
);
if
(
pData
==
NULL
)
{
qError
(
"failed to get buffer, code:%s"
,
tstrerror
(
terrno
));
return
NULL
;
}
pageId
=
*
currentPageId
;
if
(
pData
->
num
+
interBufSize
>
getBufPageSize
(
pResultBuf
))
{
...
...
@@ -200,6 +205,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if
(
isIntervalQuery
)
{
if
(
p1
!=
NULL
)
{
// the *p1 may be NULL in case of sliding+offset exists.
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
,
true
);
if
(
NULL
==
pResult
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
ASSERT
(
pResult
->
pageId
==
p1
->
pageId
&&
pResult
->
offset
==
p1
->
offset
);
}
}
else
{
...
...
@@ -208,6 +217,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if
(
p1
!=
NULL
)
{
// todo
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
,
true
);
if
(
NULL
==
pResult
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
ASSERT
(
pResult
->
pageId
==
p1
->
pageId
&&
pResult
->
offset
==
p1
->
offset
);
}
}
...
...
@@ -216,6 +229,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if
(
pResultRowInfo
->
cur
.
pageId
!=
-
1
&&
((
pResult
==
NULL
)
||
(
pResult
->
pageId
!=
pResultRowInfo
->
cur
.
pageId
)))
{
SResultRowPosition
pos
=
pResultRowInfo
->
cur
;
SFilePage
*
pPage
=
getBufPage
(
pResultBuf
,
pos
.
pageId
);
if
(
pPage
==
NULL
)
{
qError
(
"failed to get buffer, code:%s, %s"
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
releaseBufPage
(
pResultBuf
,
pPage
);
}
...
...
@@ -223,6 +240,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if
(
pResult
==
NULL
)
{
ASSERT
(
pSup
->
resultRowSize
>
0
);
pResult
=
getNewResultRow
(
pResultBuf
,
&
pSup
->
currentPageId
,
pSup
->
resultRowSize
);
if
(
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
// add a new result set for a new group
SResultRowPosition
pos
=
{.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
...
...
@@ -260,6 +280,11 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
}
else
{
SPageInfo
*
pi
=
getLastPageInfo
(
list
);
pData
=
getBufPage
(
pResultBuf
,
getPageId
(
pi
));
if
(
pData
==
NULL
)
{
qError
(
"failed to get buffer, code:%s"
,
tstrerror
(
terrno
));
return
terrno
;
}
pageId
=
getPageId
(
pi
);
if
(
pData
->
num
+
size
>
getBufPageSize
(
pResultBuf
))
{
...
...
@@ -912,7 +937,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
if
(
pResultRow
->
pageId
==
-
1
)
{
int32_t
ret
=
addNewWindowResultBuf
(
pResultRow
,
pAggInfo
->
aggSup
.
pResultBuf
,
pAggInfo
->
binfo
.
pRes
->
info
.
rowSize
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
;
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
)
;
}
}
...
...
@@ -993,6 +1018,11 @@ static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SR
int32_t
finalizeResultRows
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SFilePage
*
page
=
getBufPage
(
pBuf
,
resultRowPosition
->
pageId
);
if
(
page
==
NULL
)
{
qError
(
"failed to get buffer, code:%s, %s"
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
page
+
resultRowPosition
->
offset
);
SqlFunctionCtx
*
pCtx
=
pSup
->
pCtx
;
...
...
@@ -1036,6 +1066,10 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
for
(
int32_t
i
=
pGroupResInfo
->
index
;
i
<
numOfRows
;
i
+=
1
)
{
SResKeyPos
*
pPos
=
taosArrayGetP
(
pGroupResInfo
->
pRows
,
i
);
SFilePage
*
page
=
getBufPage
(
pBuf
,
pPos
->
pos
.
pageId
);
if
(
page
==
NULL
)
{
qError
(
"failed to get buffer, code:%s, %s"
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
page
+
pPos
->
pos
.
offset
);
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
47e885da
...
...
@@ -492,13 +492,17 @@ _error:
static
void
doHashPartition
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
SPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
for
(
int32_t
j
=
0
;
j
<
pBlock
->
info
.
rows
;
++
j
)
{
recordNewGroupKeys
(
pInfo
->
pGroupCols
,
pInfo
->
pGroupColVals
,
pBlock
,
j
);
int32_t
len
=
buildGroupKeys
(
pInfo
->
keyBuf
,
pInfo
->
pGroupColVals
);
SDataGroupInfo
*
pGroupInfo
=
NULL
;
void
*
pPage
=
getCurrentDataGroupInfo
(
pInfo
,
&
pGroupInfo
,
len
);
if
(
pPage
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
pGroupInfo
->
numOfRows
+=
1
;
...
...
@@ -595,6 +599,10 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
}
else
{
int32_t
*
curId
=
taosArrayGetLast
(
p
->
pPageList
);
pPage
=
getBufPage
(
pInfo
->
pBuf
,
*
curId
);
if
(
pPage
==
NULL
)
{
qError
(
"failed to get buffer, code:%s"
,
tstrerror
(
terrno
));
return
pPage
;
}
int32_t
*
rows
=
(
int32_t
*
)
pPage
;
if
(
*
rows
>=
pInfo
->
rowCapacity
)
{
...
...
@@ -674,7 +682,8 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
static
SSDataBlock
*
buildPartitionResult
(
SOperatorInfo
*
pOperator
)
{
SPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SDataGroupInfo
*
pGroupInfo
=
(
pInfo
->
groupIndex
!=
-
1
)
?
taosArrayGet
(
pInfo
->
sortedGroupArray
,
pInfo
->
groupIndex
)
:
NULL
;
if
(
pInfo
->
groupIndex
==
-
1
||
pInfo
->
pageIndex
>=
taosArrayGetSize
(
pGroupInfo
->
pPageList
))
{
...
...
@@ -692,7 +701,11 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
int32_t
*
pageId
=
taosArrayGet
(
pGroupInfo
->
pPageList
,
pInfo
->
pageIndex
);
void
*
page
=
getBufPage
(
pInfo
->
pBuf
,
*
pageId
);
if
(
page
==
NULL
)
{
qError
(
"failed to get buffer, code:%s, %s"
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pInfo
->
rowCapacity
);
blockDataFromBuf1
(
pInfo
->
binfo
.
pRes
,
page
,
pInfo
->
rowCapacity
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
47e885da
...
...
@@ -170,6 +170,10 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
}
*
pPage
=
getBufPage
(
pTableScanInfo
->
base
.
pdInfo
.
pAggSup
->
pResultBuf
,
p1
->
pageId
);
if
(
NULL
==
*
pPage
)
{
return
NULL
;
}
return
(
SResultRow
*
)((
char
*
)(
*
pPage
)
+
p1
->
offset
);
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
47e885da
...
...
@@ -636,6 +636,10 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
}
SResultRow
*
pr
=
getResultRowByPos
(
pInfo
->
aggSup
.
pResultBuf
,
p1
,
false
);
if
(
NULL
==
pr
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
ASSERT
(
pr
->
offset
==
p1
->
offset
&&
pr
->
pageId
==
p1
->
pageId
);
if
(
pr
->
closed
)
{
...
...
@@ -1315,6 +1319,10 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
static
void
doClearWindowImpl
(
SResultRowPosition
*
p1
,
SDiskbasedBuf
*
pResultBuf
,
SExprSupp
*
pSup
,
int32_t
numOfOutput
)
{
SResultRow
*
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
,
false
);
if
(
NULL
==
pResult
)
{
return
;
}
SqlFunctionCtx
*
pCtx
=
pSup
->
pCtx
;
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
pCtx
[
i
].
resultInfo
=
getResultEntryInfo
(
pResult
,
i
,
pSup
->
rowEntryInfoOffset
);
...
...
@@ -1328,6 +1336,9 @@ static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf,
}
}
SFilePage
*
bufPage
=
getBufPage
(
pResultBuf
,
p1
->
pageId
);
if
(
NULL
==
bufPage
)
{
return
;
}
setBufPageDirty
(
bufPage
,
true
);
releaseBufPage
(
pResultBuf
,
bufPage
);
}
...
...
@@ -4114,6 +4125,9 @@ void destroyMAIOperatorInfo(void* param) {
static
SResultRow
*
doSetSingleOutputTupleBuf
(
SResultRowInfo
*
pResultRowInfo
,
SAggSupporter
*
pSup
)
{
SResultRow
*
pResult
=
getNewResultRow
(
pSup
->
pResultBuf
,
&
pSup
->
currentPageId
,
pSup
->
resultRowSize
);
if
(
NULL
==
pResult
)
{
return
pResult
;
}
pResultRowInfo
->
cur
=
(
SResultRowPosition
){.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
return
pResult
;
}
...
...
source/libs/executor/src/tsort.c
浏览文件 @
47e885da
...
...
@@ -270,6 +270,10 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
int32_t
*
pPgId
=
taosArrayGet
(
pSource
->
pageIdList
,
pSource
->
pageIndex
);
void
*
pPage
=
getBufPage
(
pHandle
->
pBuf
,
*
pPgId
);
if
(
NULL
==
pPage
)
{
return
terrno
;
}
code
=
blockDataFromBuf
(
pSource
->
src
.
pBlock
,
pPage
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -337,6 +341,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
int32_t
*
pPgId
=
taosArrayGet
(
pSource
->
pageIdList
,
pSource
->
pageIndex
);
void
*
pPage
=
getBufPage
(
pHandle
->
pBuf
,
*
pPgId
);
if
(
pPage
==
NULL
)
{
qError
(
"failed to get buffer, code:%s"
,
tstrerror
(
terrno
));
return
terrno
;
}
int32_t
code
=
blockDataFromBuf
(
pSource
->
src
.
pBlock
,
pPage
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录