Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b8578e24
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b8578e24
编写于
1月 26, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'main' of
https://github.com/taosdata/TDengine
into fix/TD-21761
上级
4f0a605c
96155040
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
310 addition
and
168 deletion
+310
-168
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
packaging/release.sh
packaging/release.sh
+1
-1
packaging/tools/install.sh
packaging/tools/install.sh
+35
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+5
-2
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+23
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-1
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+7
-4
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+5
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+5
-2
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+9
-1
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+9
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+19
-8
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+6
-5
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+0
-4
source/libs/function/src/tpercentile.c
source/libs/function/src/tpercentile.c
+7
-6
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+12
-12
source/util/src/tarray.c
source/util/src/tarray.c
+21
-21
source/util/src/tcache.c
source/util/src/tcache.c
+1
-1
source/util/src/tpagedbuf.c
source/util/src/tpagedbuf.c
+142
-96
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
b8578e24
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
5c53cc8
GIT_TAG
7d24ed5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
packaging/release.sh
浏览文件 @
b8578e24
...
...
@@ -2,7 +2,7 @@
#
# Generate the deb package for ubuntu, or rpm package for centos, or tar.gz package for other linux os
#
set -e
set
-e
# set -x
# release.sh -v [cluster | edge]
...
...
packaging/tools/install.sh
浏览文件 @
b8578e24
...
...
@@ -743,6 +743,34 @@ function is_version_compatible() {
esac
}
deb_erase
()
{
confirm
=
""
while
[
""
==
"
${
confirm
}
"
]
;
do
echo
-e
-n
"
${
RED
}
Exist tdengine deb detected, do you want to remove it? [yes|no]
${
NC
}
:"
read
confirm
if
[
"yes"
==
"
$confirm
"
]
;
then
${
csudo
}
dpkg
--remove
tdengine
||
:
break
elif
[
"no"
==
"
$confirm
"
]
;
then
break
fi
done
}
rpm_erase
()
{
confirm
=
""
while
[
""
==
"
${
confirm
}
"
]
;
do
echo
-e
-n
"
${
RED
}
Exist tdengine rpm detected, do you want to remove it? [yes|no]
${
NC
}
:"
read
confirm
if
[
"yes"
==
"
$confirm
"
]
;
then
${
csudo
}
rpm
-e
tdengine
||
:
break
elif
[
"no"
==
"
$confirm
"
]
;
then
break
fi
done
}
function
updateProduct
()
{
# Check if version compatible
if
!
is_version_compatible
;
then
...
...
@@ -755,6 +783,13 @@ function updateProduct() {
echo
"File
${
tarName
}
does not exist"
exit
1
fi
if
echo
$osinfo
|
grep
-qwi
"centos"
;
then
rpm
-q
tdengine 2>&1
>
/dev/null
&&
rpm_erase tdengine
||
:
elif
echo
$osinfo
|
grep
-qwi
"ubuntu"
;
then
dpkg
-l
tdengine 2>&1
>
/dev/null
&&
deb_erase tdengine
||
:
fi
tar
-zxf
${
tarName
}
install_jemalloc
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
b8578e24
...
...
@@ -1758,11 +1758,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
if
(
minKey
==
k
.
ts
)
{
STSchema
*
pSchema
=
doGetSchemaForTSRow
(
TSDBROW_SVERSION
(
pRow
),
pReader
,
pBlockScanInfo
->
uid
);
if
(
pSchema
==
NULL
)
{
return
terrno
;
}
if
(
init
)
{
tRowMerge
(
&
merge
,
pRow
);
tRowMerge
rAdd
(
&
merge
,
pRow
,
pSchema
);
}
else
{
init
=
true
;
STSchema
*
pSchema
=
doGetSchemaForTSRow
(
TSDBROW_SVERSION
(
pRow
),
pReader
,
pBlockScanInfo
->
uid
);
int32_t
code
=
tRowMergerInit
(
&
merge
,
pRow
,
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
b8578e24
...
...
@@ -731,6 +731,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
tsdbRowGetColVal
(
pRow
,
pTSchema
,
jCol
++
,
pColVal
);
if
(
key
.
version
>
pMerger
->
version
)
{
#if 0
if (!COL_VAL_IS_NONE(pColVal)) {
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol);
...
...
@@ -746,6 +747,28 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
#endif
if
(
!
COL_VAL_IS_NONE
(
pColVal
))
{
if
(
IS_VAR_DATA_TYPE
(
pColVal
->
type
))
{
SColVal
*
pTColVal
=
taosArrayGet
(
pMerger
->
pArray
,
iCol
);
if
(
!
COL_VAL_IS_NULL
(
pColVal
))
{
code
=
tRealloc
(
&
pTColVal
->
value
.
pData
,
pColVal
->
value
.
nData
);
if
(
code
)
return
code
;
pTColVal
->
value
.
nData
=
pColVal
->
value
.
nData
;
if
(
pTColVal
->
value
.
nData
)
{
memcpy
(
pTColVal
->
value
.
pData
,
pColVal
->
value
.
pData
,
pTColVal
->
value
.
nData
);
}
pTColVal
->
flag
=
0
;
}
else
{
tFree
(
pTColVal
->
value
.
pData
);
pTColVal
->
value
.
pData
=
NULL
;
taosArraySet
(
pMerger
->
pArray
,
iCol
,
pColVal
);
}
}
else
{
taosArraySet
(
pMerger
->
pArray
,
iCol
,
pColVal
);
}
}
}
else
if
(
key
.
version
<
pMerger
->
version
)
{
SColVal
*
tColVal
=
(
SColVal
*
)
taosArrayGet
(
pMerger
->
pArray
,
iCol
);
if
(
COL_VAL_IS_NONE
(
tColVal
)
&&
!
COL_VAL_IS_NONE
(
pColVal
))
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
b8578e24
...
...
@@ -705,7 +705,8 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
bool
hasLimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
void
initLimitInfo
(
const
SNode
*
pLimit
,
const
SNode
*
pSLimit
,
SLimitInfo
*
pLimitInfo
);
void
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
);
void
resetLimitInfoForNextGroup
(
SLimitInfo
*
pLimitInfo
);
bool
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
);
void
applyAggFunctionOnPartialTuples
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
int32_t
numOfTotal
,
int32_t
numOfOutput
);
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
b8578e24
...
...
@@ -738,9 +738,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
}
// reset the value for a new group data
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
resetLimitInfoForNextGroup
(
pLimitInfo
);
// existing rows that belongs to previous group.
if
(
pBlock
->
info
.
rows
>
0
)
{
return
PROJECT_RETRIEVE_DONE
;
...
...
@@ -766,7 +764,12 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
setOperatorCompleted
(
pOperator
);
}
else
{
// current group limitation is reached, and future blocks of this group need to be discarded.
if
(
pBlock
->
info
.
rows
==
0
)
{
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
return
PROJECT_RETRIEVE_DONE
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
b8578e24
...
...
@@ -1759,6 +1759,11 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
pLimitInfo
->
remainGroupOffset
=
slimit
.
offset
;
}
void
resetLimitInfoForNextGroup
(
SLimitInfo
*
pLimitInfo
)
{
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
}
uint64_t
tableListGetSize
(
const
STableListInfo
*
pTableList
)
{
ASSERT
(
taosArrayGetSize
(
pTableList
->
pTableList
)
==
taosHashGetSize
(
pTableList
->
map
));
return
taosArrayGetSize
(
pTableList
->
pTableList
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
b8578e24
...
...
@@ -24,12 +24,16 @@
static
TdThreadOnce
initPoolOnce
=
PTHREAD_ONCE_INIT
;
int32_t
exchangeObjRefPool
=
-
1
;
static
void
initRefPool
()
{
exchangeObjRefPool
=
taosOpenRef
(
1024
,
doDestroyExchangeOperatorInfo
);
}
static
void
cleanupRefPool
()
{
int32_t
ref
=
atomic_val_compare_exchange_32
(
&
exchangeObjRefPool
,
exchangeObjRefPool
,
0
);
taosCloseRef
(
ref
);
}
static
void
initRefPool
()
{
exchangeObjRefPool
=
taosOpenRef
(
1024
,
doDestroyExchangeOperatorInfo
);
atexit
(
cleanupRefPool
);
}
static
int32_t
doSetSMABlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
char
*
id
)
{
ASSERT
(
pOperator
!=
NULL
);
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
...
...
@@ -442,7 +446,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
taosThreadOnce
(
&
initPoolOnce
,
initRefPool
);
atexit
(
cleanupRefPool
);
qDebug
(
"start to create subplan task, TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
taskId
,
pSubplan
->
id
.
queryId
);
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
b8578e24
...
...
@@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
int32_t
pageId
=
0
;
pPage
=
getNewBufPage
(
pInfo
->
pBuf
,
&
pageId
);
taosArrayPush
(
p
->
pPageList
,
&
pageId
);
if
(
pPage
==
NULL
)
{
return
pPage
;
}
taosArrayPush
(
p
->
pPageList
,
&
pageId
);
*
(
int32_t
*
)
pPage
=
0
;
}
else
{
int32_t
*
curId
=
taosArrayGetLast
(
p
->
pPageList
);
...
...
@@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
// add a new page for current group
int32_t
pageId
=
0
;
pPage
=
getNewBufPage
(
pInfo
->
pBuf
,
&
pageId
);
if
(
pPage
==
NULL
)
{
qError
(
"failed to get new buffer, code:%s"
,
tstrerror
(
terrno
));
return
NULL
;
}
taosArrayPush
(
p
->
pPageList
,
&
pageId
);
memset
(
pPage
,
0
,
getBufPageSize
(
pInfo
->
pBuf
));
}
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
b8578e24
...
...
@@ -175,8 +175,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
// reset the value for a new group data
// existing rows that belongs to previous group.
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
resetLimitInfoForNextGroup
(
pLimitInfo
);
}
return
PROJECT_RETRIEVE_DONE
;
...
...
@@ -200,10 +199,18 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
// TODO: optimize it later when partition by + limit
// all retrieved requirement has been fulfilled, let's finish this
if
((
pLimitInfo
->
slimit
.
limit
==
-
1
&&
pLimitInfo
->
currentGroupId
==
0
)
||
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
setOperatorCompleted
(
pOperator
);
}
else
{
// Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data
// from next group. So let's continue this retrieve process
if
(
keepRows
==
0
)
{
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
}
...
...
@@ -357,7 +364,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
}
// printDataBlock1(p, "project");
return
(
p
->
info
.
rows
>
0
)
?
p
:
NULL
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
b8578e24
...
...
@@ -257,7 +257,7 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo
}
// todo handle the slimit info
void
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
)
{
bool
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
)
{
SLimit
*
pLimit
=
&
pLimitInfo
->
limit
;
const
char
*
id
=
GET_TASKID
(
pTaskInfo
);
...
...
@@ -266,6 +266,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
pLimitInfo
->
remainOffset
-=
pBlock
->
info
.
rows
;
blockDataEmpty
(
pBlock
);
qDebug
(
"current block ignore due to offset, current:%"
PRId64
", %s"
,
pLimitInfo
->
remainOffset
,
id
);
return
false
;
}
else
{
blockDataTrimFirstNRows
(
pBlock
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
...
...
@@ -274,13 +275,14 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
if
(
pLimit
->
limit
!=
-
1
&&
pLimit
->
limit
<=
(
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
))
{
// limit the output rows
int32_t
overflowRows
=
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
-
pLimit
->
limit
;
int32_t
keep
=
pBlock
->
info
.
rows
-
overflowRows
;
int32_t
keep
=
(
int32_t
)(
pLimit
->
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keep
);
qDebug
(
"output limit %"
PRId64
" has reached, %s"
,
pLimit
->
limit
,
id
);
pOperator
->
status
=
OP_EXEC_DONE
;
return
true
;
}
return
false
;
}
static
int32_t
loadDataBlock
(
SOperatorInfo
*
pOperator
,
STableScanBase
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
...
...
@@ -391,7 +393,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
}
}
applyLimitOffset
(
&
pTableScanInfo
->
limitInfo
,
pBlock
,
pTaskInfo
,
pOperator
);
bool
limitReached
=
applyLimitOffset
(
&
pTableScanInfo
->
limitInfo
,
pBlock
,
pTaskInfo
,
pOperator
);
if
(
limitReached
)
{
// set operator flag is done
setOperatorCompleted
(
pOperator
);
}
pCost
->
totalRows
+=
pBlock
->
info
.
rows
;
pTableScanInfo
->
limitInfo
.
numOfOutputRows
=
pCost
->
totalRows
;
...
...
@@ -768,8 +773,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// reset value for the next group data output
pOperator
->
status
=
OP_OPENED
;
pInfo
->
base
.
limitInfo
.
numOfOutputRows
=
0
;
pInfo
->
base
.
limitInfo
.
remainOffset
=
pInfo
->
base
.
limitInfo
.
limit
.
offset
;
resetLimitInfoForNextGroup
(
&
pInfo
->
base
.
limitInfo
);
int32_t
num
=
0
;
STableKeyInfo
*
pList
=
NULL
;
...
...
@@ -2685,9 +2689,12 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
taosArrayDestroy
(
pInfo
->
queryConds
);
pInfo
->
queryConds
=
NULL
;
resetLimitInfoForNextGroup
(
&
pInfo
->
limitInfo
);
return
TSDB_CODE_SUCCESS
;
}
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
SSDataBlock
*
getSortedTableMergeScanBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pResBlock
,
int32_t
capacity
,
SOperatorInfo
*
pOperator
)
{
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -2707,10 +2714,12 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
}
}
qDebug
(
"%s get sorted row blocks, rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pResBlock
->
info
.
rows
);
applyLimitOffset
(
&
pInfo
->
limitInfo
,
pResBlock
,
pTaskInfo
,
pOperator
);
pInfo
->
limitInfo
.
numOfOutputRows
+=
pResBlock
->
info
.
rows
;
qDebug
(
"%s get sorted row block, rows:%d, limit:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pResBlock
->
info
.
rows
,
pInfo
->
limitInfo
.
numOfOutputRows
);
return
(
pResBlock
->
info
.
rows
>
0
)
?
pResBlock
:
NULL
;
}
...
...
@@ -2749,11 +2758,13 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
return
pBlock
;
}
else
{
// Data of this group are all dumped, let's try the next group
stopGroupTableMergeScan
(
pOperator
);
if
(
pInfo
->
tableEndIndex
>=
tableListSize
-
1
)
{
setOperatorCompleted
(
pOperator
);
break
;
}
pInfo
->
tableStartIndex
=
pInfo
->
tableEndIndex
+
1
;
pInfo
->
groupId
=
tableListGetInfo
(
pTaskInfo
->
pTableInfoList
,
pInfo
->
tableStartIndex
)
->
groupId
;
startGroupTableMergeScan
(
pOperator
);
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
b8578e24
...
...
@@ -680,11 +680,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
break
;
}
bool
limitReached
=
applyLimitOffset
(
&
pInfo
->
limitInfo
,
p
,
pTaskInfo
,
pOperator
);
if
(
limitReached
)
{
resetLimitInfoForNextGroup
(
&
pInfo
->
limitInfo
);
}
if
(
p
->
info
.
rows
>
0
)
{
applyLimitOffset
(
&
pInfo
->
limitInfo
,
p
,
pTaskInfo
,
pOperator
);
if
(
p
->
info
.
rows
>
0
)
{
break
;
}
break
;
}
}
...
...
@@ -698,7 +700,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
colDataAssign
(
pDst
,
pSrc
,
p
->
info
.
rows
,
&
pDataBlock
->
info
);
}
pInfo
->
limitInfo
.
numOfOutputRows
+=
p
->
info
.
rows
;
pDataBlock
->
info
.
rows
=
p
->
info
.
rows
;
pDataBlock
->
info
.
id
.
groupId
=
pInfo
->
groupId
;
pDataBlock
->
info
.
dataLoad
=
1
;
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
b8578e24
...
...
@@ -3061,14 +3061,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
if
(
pHandle
->
currentPage
==
-
1
)
{
pPage
=
getNewBufPage
(
pHandle
->
pBuf
,
&
pHandle
->
currentPage
);
if
(
pPage
==
NULL
)
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
pPage
->
num
=
sizeof
(
SFilePage
);
}
else
{
pPage
=
getBufPage
(
pHandle
->
pBuf
,
pHandle
->
currentPage
);
if
(
pPage
==
NULL
)
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
if
(
pPage
->
num
+
length
>
getBufPageSize
(
pHandle
->
pBuf
))
{
...
...
@@ -3076,7 +3074,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
releaseBufPage
(
pHandle
->
pBuf
,
pPage
);
pPage
=
getNewBufPage
(
pHandle
->
pBuf
,
&
pHandle
->
currentPage
);
if
(
pPage
==
NULL
)
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
pPage
->
num
=
sizeof
(
SFilePage
);
...
...
@@ -3123,7 +3120,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
if
(
pHandle
->
pBuf
!=
NULL
)
{
SFilePage
*
pPage
=
getBufPage
(
pHandle
->
pBuf
,
pPos
->
pageId
);
if
(
pPage
==
NULL
)
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
memcpy
(
pPage
->
data
+
pPos
->
offset
,
pBuf
,
length
);
...
...
source/libs/function/src/tpercentile.c
浏览文件 @
b8578e24
...
...
@@ -43,8 +43,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
if
(
pg
==
NULL
)
{
return
NULL
;
}
memcpy
(
buffer
->
data
+
offset
,
pg
->
data
,
(
size_t
)(
pg
->
num
*
pMemBucket
->
bytes
));
memcpy
(
buffer
->
data
+
offset
,
pg
->
data
,
(
size_t
)(
pg
->
num
*
pMemBucket
->
bytes
));
offset
+=
(
int32_t
)(
pg
->
num
*
pMemBucket
->
bytes
);
}
...
...
@@ -109,7 +109,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
int32_t
*
pageId
=
taosArrayGet
(
list
,
0
);
SFilePage
*
pPage
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
if
(
pPage
==
NULL
)
{
return
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
ASSERT
(
pPage
->
num
==
1
);
...
...
@@ -276,7 +276,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
return
NULL
;
}
int32_t
ret
=
createDiskbasedBuf
(
&
pBucket
->
pBuffer
,
pBucket
->
bufPageSize
,
pBucket
->
bufPageSize
*
512
,
"1"
,
tsTempDir
);
int32_t
ret
=
createDiskbasedBuf
(
&
pBucket
->
pBuffer
,
pBucket
->
bufPageSize
,
pBucket
->
bufPageSize
*
1024
,
"1"
,
tsTempDir
);
if
(
ret
!=
0
)
{
tMemBucketDestroy
(
pBucket
);
return
NULL
;
...
...
@@ -388,7 +388,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot
->
info
.
data
=
getNewBufPage
(
pBucket
->
pBuffer
,
&
pageId
);
if
(
pSlot
->
info
.
data
==
NULL
)
{
return
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
pSlot
->
info
.
pageId
=
pageId
;
taosArrayPush
(
pPageIdList
,
&
pageId
);
...
...
@@ -482,8 +482,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
// data in buffer and file are merged together to be processed.
SFilePage
*
buffer
=
loadDataFromFilePage
(
pMemBucket
,
i
);
if
(
buffer
==
NULL
)
{
return
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
int32_t
currentIdx
=
count
-
num
;
char
*
thisVal
=
buffer
->
data
+
pMemBucket
->
bytes
*
currentIdx
;
...
...
@@ -520,7 +521,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
int32_t
*
pageId
=
taosArrayGet
(
list
,
f
);
SFilePage
*
pg
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
if
(
pg
==
NULL
)
{
return
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
int32_t
code
=
tMemBucketPut
(
pMemBucket
,
pg
->
data
,
(
int32_t
)
pg
->
num
);
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
b8578e24
...
...
@@ -1080,29 +1080,29 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
return
false
;
}
SSortLogicNode
*
pSort
=
(
SSortLogicNode
*
)
pNode
;
if
(
pSort
->
groupSort
||
!
sortPriKeyOptIsPriKeyOrderBy
(
pSort
->
pSortKeys
)
||
1
!=
LIST_LENGTH
(
pSort
->
node
.
pChildren
))
{
if
(
!
sortPriKeyOptIsPriKeyOrderBy
(
pSort
->
pSortKeys
)
||
1
!=
LIST_LENGTH
(
pSort
->
node
.
pChildren
))
{
return
false
;
}
return
true
;
}
static
int32_t
sortPriKeyOptGetSequencingNodesImpl
(
SLogicNode
*
pNode
,
bool
*
pNotOptimize
,
static
int32_t
sortPriKeyOptGetSequencingNodesImpl
(
SLogicNode
*
pNode
,
bool
groupSort
,
bool
*
pNotOptimize
,
SNodeList
**
pSequencingNodes
)
{
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
pNode
;
if
(
NULL
!=
pScan
->
pGroupTags
||
TSDB_SYSTEM_TABLE
==
pScan
->
tableType
)
{
if
(
(
!
groupSort
&&
NULL
!=
pScan
->
pGroupTags
)
||
TSDB_SYSTEM_TABLE
==
pScan
->
tableType
)
{
*
pNotOptimize
=
true
;
return
TSDB_CODE_SUCCESS
;
}
return
nodesListMakeAppend
(
pSequencingNodes
,
(
SNode
*
)
pNode
);
}
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
{
int32_t
code
=
sortPriKeyOptGetSequencingNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
),
int32_t
code
=
sortPriKeyOptGetSequencingNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
),
groupSort
,
pNotOptimize
,
pSequencingNodes
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
sortPriKeyOptGetSequencingNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
1
),
pNotOptimize
,
pSequencingNodes
);
code
=
sortPriKeyOptGetSequencingNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
1
),
groupSort
,
p
NotOptimize
,
p
SequencingNodes
);
}
return
code
;
}
...
...
@@ -1121,13 +1121,13 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool* pNot
return
TSDB_CODE_SUCCESS
;
}
return
sortPriKeyOptGetSequencingNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pSequencingNodes
);
return
sortPriKeyOptGetSequencingNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
),
groupSort
,
p
NotOptimize
,
p
SequencingNodes
);
}
static
int32_t
sortPriKeyOptGetSequencingNodes
(
SLogicNode
*
pNode
,
SNodeList
**
pSequencingNodes
)
{
static
int32_t
sortPriKeyOptGetSequencingNodes
(
SLogicNode
*
pNode
,
bool
groupSort
,
SNodeList
**
pSequencingNodes
)
{
bool
notOptimize
=
false
;
int32_t
code
=
sortPriKeyOptGetSequencingNodesImpl
(
pNode
,
&
notOptimize
,
pSequencingNodes
);
int32_t
code
=
sortPriKeyOptGetSequencingNodesImpl
(
pNode
,
groupSort
,
&
notOptimize
,
pSequencingNodes
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
notOptimize
)
{
NODES_CLEAR_LIST
(
*
pSequencingNodes
);
}
...
...
@@ -1175,8 +1175,8 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
static
int32_t
sortPrimaryKeyOptimizeImpl
(
SOptimizeContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
,
SSortLogicNode
*
pSort
)
{
SNodeList
*
pSequencingNodes
=
NULL
;
int32_t
code
=
sortPriKeyOptGetSequencingNodes
((
SLogicNode
*
)
nodesListGetNode
(
pSort
->
node
.
pChildren
,
0
)
,
&
pSequencingNodes
);
int32_t
code
=
sortPriKeyOptGetSequencingNodes
((
SLogicNode
*
)
nodesListGetNode
(
pSort
->
node
.
pChildren
,
0
),
pSort
->
groupSort
,
&
pSequencingNodes
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pSequencingNodes
)
{
code
=
sortPriKeyOptApply
(
pCxt
,
pLogicSubplan
,
pSort
,
pSequencingNodes
);
}
...
...
source/util/src/tarray.c
浏览文件 @
b8578e24
...
...
@@ -20,7 +20,10 @@
// todo refactor API
SArray
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
)
{
assert
(
elemSize
>
0
);
if
(
elemSize
==
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
if
(
size
<
TARRAY_MIN_SIZE
)
{
size
=
TARRAY_MIN_SIZE
;
...
...
@@ -96,8 +99,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) {
}
void
taosArrayRemoveDuplicate
(
SArray
*
pArray
,
__compar_fn_t
comparFn
,
void
(
*
fp
)(
void
*
))
{
assert
(
pArray
);
size_t
size
=
pArray
->
size
;
if
(
size
<=
1
)
{
return
;
...
...
@@ -136,8 +137,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
}
void
taosArrayRemoveDuplicateP
(
SArray
*
pArray
,
__compar_fn_t
comparFn
,
void
(
*
fp
)(
void
*
))
{
assert
(
pArray
);
size_t
size
=
pArray
->
size
;
if
(
size
<=
1
)
{
return
;
...
...
@@ -195,11 +194,10 @@ void* taosArrayReserve(SArray* pArray, int32_t num) {
}
void
*
taosArrayPop
(
SArray
*
pArray
)
{
assert
(
pArray
!=
NULL
);
if
(
pArray
->
size
==
0
)
{
return
NULL
;
}
pArray
->
size
-=
1
;
return
TARRAY_GET_ELEM
(
pArray
,
pArray
->
size
);
}
...
...
@@ -208,16 +206,21 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
if
(
NULL
==
pArray
)
{
return
NULL
;
}
assert
(
index
<
pArray
->
size
);
if
(
index
>=
pArray
->
size
)
{
uError
(
"index is out of range, current:%"
PRIzu
" max:%d"
,
index
,
pArray
->
capacity
);
return
NULL
;
}
return
TARRAY_GET_ELEM
(
pArray
,
index
);
}
void
*
taosArrayGetP
(
const
SArray
*
pArray
,
size_t
index
)
{
assert
(
index
<
pArray
->
size
);
void
*
d
=
TARRAY_GET_ELEM
(
pArray
,
index
)
;
return
*
(
void
**
)
d
;
void
**
p
=
taosArrayGet
(
pArray
,
index
);
if
(
p
==
NULL
)
{
return
NULL
;
}
return
*
p
;
}
void
*
taosArrayGetLast
(
const
SArray
*
pArray
)
{
return
TARRAY_GET_ELEM
(
pArray
,
pArray
->
size
-
1
);
}
...
...
@@ -296,9 +299,12 @@ void taosArrayRemove(SArray* pArray, size_t index) {
}
SArray
*
taosArrayFromList
(
const
void
*
src
,
size_t
size
,
size_t
elemSize
)
{
assert
(
src
!=
NULL
&&
elemSize
>
0
);
SArray
*
pDst
=
taosArrayInit
(
size
,
elemSize
);
if
(
elemSize
<=
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
SArray
*
pDst
=
taosArrayInit
(
size
,
elemSize
);
memcpy
(
pDst
->
pData
,
src
,
elemSize
*
size
);
pDst
->
size
=
size
;
...
...
@@ -306,8 +312,6 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) {
}
SArray
*
taosArrayDup
(
const
SArray
*
pSrc
,
__array_item_dup_fn_t
fn
)
{
assert
(
pSrc
!=
NULL
);
if
(
pSrc
->
size
==
0
)
{
// empty array list
return
taosArrayInit
(
8
,
pSrc
->
elemSize
);
}
...
...
@@ -399,14 +403,10 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp) {
}
void
taosArraySort
(
SArray
*
pArray
,
__compar_fn_t
compar
)
{
ASSERT
(
pArray
!=
NULL
&&
compar
!=
NULL
);
taosSort
(
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
compar
);
}
void
*
taosArraySearch
(
const
SArray
*
pArray
,
const
void
*
key
,
__compar_fn_t
comparFn
,
int32_t
flags
)
{
assert
(
pArray
!=
NULL
&&
comparFn
!=
NULL
);
assert
(
key
!=
NULL
);
return
taosbsearch
(
key
,
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
comparFn
,
flags
);
}
...
...
source/util/src/tcache.c
浏览文件 @
b8578e24
...
...
@@ -921,7 +921,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1)
void
taosStopCacheRefreshWorker
(
void
)
{
stopRefreshWorker
=
true
;
TdThreadOnce
tmp
=
PTHREAD_ONCE_INIT
;
if
(
memcmp
(
&
cache
RefreshWorker
,
&
tmp
,
sizeof
(
TdThreadOnce
))
!=
0
)
taosThreadJoin
(
cacheRefreshWorker
,
NULL
);
if
(
memcmp
(
&
cache
ThreadInit
,
&
tmp
,
sizeof
(
TdThreadOnce
))
!=
0
)
taosThreadJoin
(
cacheRefreshWorker
,
NULL
);
taosArrayDestroy
(
pCacheArrayList
);
}
...
...
source/util/src/tpagedbuf.c
浏览文件 @
b8578e24
...
...
@@ -5,7 +5,10 @@
#include "thash.h"
#include "tlog.h"
#define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL)
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
#define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
typedef
struct
SPageDiskInfo
{
...
...
@@ -14,7 +17,7 @@ typedef struct SPageDiskInfo {
}
SPageDiskInfo
,
SFreeListItem
;
struct
SPageInfo
{
SListNode
*
pn
;
// point to list node struct
SListNode
*
pn
;
// point to list node struct. it is NULL when the page is evicted from the in-memory buffer
void
*
pData
;
int64_t
offset
;
int32_t
pageId
;
...
...
@@ -89,7 +92,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskba
return
data
;
}
static
uint64_t
allocatePositionInFile
(
SDiskbasedBuf
*
pBuf
,
size_t
size
)
{
static
uint64_t
allocate
New
PositionInFile
(
SDiskbasedBuf
*
pBuf
,
size_t
size
)
{
if
(
pBuf
->
pFree
==
NULL
)
{
return
pBuf
->
nextPos
;
}
else
{
...
...
@@ -112,10 +115,6 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
}
}
static
void
setPageNotInBuf
(
SPageInfo
*
pPageInfo
)
{
pPageInfo
->
pData
=
NULL
;
}
static
FORCE_INLINE
size_t
getAllocPageSize
(
int32_t
pageSize
)
{
return
pageSize
+
POINTER_BYTES
+
sizeof
(
SFilePage
);
}
/**
* +--------------------------+-------------------+--------------+
* | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
...
...
@@ -124,23 +123,31 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize
* @param pg
* @return
*/
static
char
*
doFlushPageToDisk
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
ASSERT
(
!
pg
->
used
&&
pg
->
pData
!=
NULL
);
static
FORCE_INLINE
size_t
getAllocPageSize
(
int32_t
pageSize
)
{
return
pageSize
+
POINTER_BYTES
+
sizeof
(
SFilePage
);
}
static
char
*
doFlushBufPage
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
if
(
pg
->
pData
==
NULL
||
pg
->
used
)
{
uError
(
"invalid params in paged buffer process when flushing buf to disk, %s"
,
pBuf
->
id
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
int32_t
size
=
pBuf
->
pageSize
;
char
*
t
=
NULL
;
if
(
pg
->
offset
==
-
1
||
pg
->
dirty
)
{
void
*
payload
=
GET_
DATA_PAYLOAD
(
pg
);
if
(
(
!
HAS_DATA_IN_DISK
(
pg
))
||
pg
->
dirty
)
{
void
*
payload
=
GET_
PAYLOAD_DATA
(
pg
);
t
=
doCompressData
(
payload
,
pBuf
->
pageSize
,
&
size
,
pBuf
);
ASSERTS
(
size
>=
0
,
"size is negative"
);
if
(
size
<
0
)
{
uError
(
"failed to compress data when flushing data to disk, %s"
,
pBuf
->
id
);
return
NULL
;
}
}
// this page is flushed to disk for the first time
if
(
pg
->
dirty
)
{
if
(
pg
->
offset
==
-
1
)
{
ASSERTS
(
pg
->
dirty
==
true
,
"pg->dirty is false"
);
pg
->
offset
=
allocatePositionInFile
(
pBuf
,
size
);
if
(
!
HAS_DATA_IN_DISK
(
pg
))
{
pg
->
offset
=
allocateNewPositionInFile
(
pBuf
,
size
);
pBuf
->
nextPos
+=
size
;
int32_t
ret
=
taosLSeekFile
(
pBuf
->
pFile
,
pg
->
offset
,
SEEK_SET
);
...
...
@@ -155,6 +162,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return
NULL
;
}
// extend the file size
if
(
pBuf
->
fileSize
<
pg
->
offset
+
size
)
{
pBuf
->
fileSize
=
pg
->
offset
+
size
;
}
...
...
@@ -169,7 +177,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
taosArrayPush
(
pBuf
->
pFree
,
&
dinfo
);
// 2. allocate new position, and update the info
pg
->
offset
=
allocatePositionInFile
(
pBuf
,
size
);
pg
->
offset
=
allocate
New
PositionInFile
(
pBuf
,
size
);
pBuf
->
nextPos
+=
size
;
}
...
...
@@ -197,20 +205,19 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
size
=
pg
->
length
;
}
ASSERT
(
size
>
0
||
(
pg
->
offset
==
-
1
&&
pg
->
length
==
-
1
));
char
*
pDataBuf
=
pg
->
pData
;
memset
(
pDataBuf
,
0
,
getAllocPageSize
(
pBuf
->
pageSize
));
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_flush %p, pageId:%d, offset:%d"
,
pDataBuf
,
pg
->
pageId
,
pg
->
offset
);
#endif
pg
->
length
=
size
;
// on disk size
return
pDataBuf
;
}
static
char
*
flush
PageToDisk
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
static
char
*
flush
BufPage
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
int32_t
ret
=
TSDB_CODE_SUCCESS
;
ASSERT
(((
int64_t
)
pBuf
->
numOfPages
*
pBuf
->
pageSize
)
==
pBuf
->
totalBufSize
&&
pBuf
->
numOfPages
>=
pBuf
->
inMemPages
);
if
(
pBuf
->
pFile
==
NULL
)
{
if
((
ret
=
createDiskFile
(
pBuf
))
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -219,22 +226,27 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
}
}
char
*
p
=
doFlushPageToDisk
(
pBuf
,
pg
);
setPageNotInBuf
(
pg
);
pg
->
dirty
=
false
;
char
*
p
=
doFlushBufPage
(
pBuf
,
pg
);
CLEAR_BUF_PAGE_IN_MEM_FLAG
(
pg
);
pg
->
dirty
=
false
;
return
p
;
}
// load file block data in disk
static
int32_t
loadPageFromDisk
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
if
(
pg
->
offset
<
0
||
pg
->
length
<=
0
)
{
uError
(
"failed to load buf page from disk, offset:%"
PRId64
", length:%d, %s"
,
pg
->
offset
,
pg
->
length
,
pBuf
->
id
);
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
ret
=
taosLSeekFile
(
pBuf
->
pFile
,
pg
->
offset
,
SEEK_SET
);
if
(
ret
==
-
1
)
{
ret
=
TAOS_SYSTEM_ERROR
(
errno
);
return
ret
;
}
void
*
pPage
=
(
void
*
)
GET_
DATA_PAYLOAD
(
pg
);
void
*
pPage
=
(
void
*
)
GET_
PAYLOAD_DATA
(
pg
);
ret
=
(
int32_t
)
taosReadFile
(
pBuf
->
pFile
,
pPage
,
pg
->
length
);
if
(
ret
!=
pg
->
length
)
{
ret
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -249,10 +261,14 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return
0
;
}
static
SPageInfo
*
register
Page
(
SDiskbasedBuf
*
pBuf
,
int32_t
pageId
)
{
static
SPageInfo
*
register
NewPageInfo
(
SDiskbasedBuf
*
pBuf
,
int32_t
pageId
)
{
pBuf
->
numOfPages
+=
1
;
SPageInfo
*
ppi
=
taosMemoryMalloc
(
sizeof
(
SPageInfo
));
if
(
ppi
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
ppi
->
pageId
=
pageId
;
ppi
->
pData
=
NULL
;
...
...
@@ -272,46 +288,33 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
SListNode
*
pn
=
NULL
;
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
SPageInfo
*
pageInfo
=
*
(
SPageInfo
**
)
pn
->
data
;
ASSERT
(
pageInfo
->
pageId
>=
0
&&
pageInfo
->
pn
==
pn
);
SPageInfo
*
p
=
*
(
SPageInfo
**
)(
pageInfo
->
pData
);
ASSERT
(
pageInfo
->
pageId
>=
0
&&
pageInfo
->
pn
==
pn
&&
p
==
pageInfo
);
if
(
!
pageInfo
->
used
)
{
// printf("%d is chosen\n", pageInfo->pageId);
break
;
}
else
{
// printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
}
}
return
pn
;
}
static
char
*
evacOneDataPage
(
SDiskbasedBuf
*
pBuf
)
{
char
*
bufPage
=
NULL
;
static
char
*
evictBufPage
(
SDiskbasedBuf
*
pBuf
)
{
SListNode
*
pn
=
getEldestUnrefedPage
(
pBuf
);
terrno
=
0
;
// all pages are referenced by user, try to allocate new space
if
(
pn
==
NULL
)
{
int32_t
prev
=
pBuf
->
inMemPages
;
// increase by 50% of previous mem pages
pBuf
->
inMemPages
=
(
int32_t
)(
pBuf
->
inMemPages
*
1
.
5
f
);
// qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
// pBuf->inMemPages, pBuf->pageSize);
}
else
{
tdListPopNode
(
pBuf
->
lruList
,
pn
);
if
(
pn
==
NULL
)
{
// no available buffer pages now, return.
return
NULL
;
}
SPageInfo
*
d
=
*
(
SPageInfo
**
)
pn
->
data
;
ASSERTS
(
d
->
pn
==
pn
,
"d->pn not equal pn"
);
terrno
=
0
;
tdListPopNode
(
pBuf
->
lruList
,
pn
);
d
->
pn
=
NULL
;
taosMemoryFreeClear
(
pn
);
SPageInfo
*
d
=
*
(
SPageInfo
**
)
pn
->
data
;
bufPage
=
flushPageToDisk
(
pBuf
,
d
)
;
}
d
->
pn
=
NULL
;
taosMemoryFreeClear
(
pn
);
return
bufPage
;
return
flushBufPage
(
pBuf
,
d
)
;
}
static
void
lruListPushFront
(
SList
*
pList
,
SPageInfo
*
pi
)
{
...
...
@@ -338,13 +341,12 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
SDiskbasedBuf
*
pPBuf
=
*
pBuf
;
if
(
pPBuf
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
pPBuf
->
pageSize
=
pagesize
;
pPBuf
->
numOfPages
=
0
;
// all pages are in buffer in the first place
pPBuf
->
totalBufSize
=
0
;
pPBuf
->
inMemPages
=
inMemBufSize
/
pagesize
;
// maximum allowed pages, it is a soft limit.
pPBuf
->
allocateId
=
-
1
;
pPBuf
->
pFile
=
NULL
;
pPBuf
->
id
=
strdup
(
id
);
...
...
@@ -353,33 +355,69 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
pPBuf
->
freePgList
=
tdListNew
(
POINTER_BYTES
);
// at least more than 2 pages must be in memory
ASSERT
(
inMemBufSize
>=
pagesize
*
2
);
if
(
inMemBufSize
<
pagesize
*
2
)
{
inMemBufSize
=
pagesize
*
2
;
}
pPBuf
->
inMemPages
=
inMemBufSize
/
pagesize
;
// maximum allowed pages, it is a soft limit.
pPBuf
->
lruList
=
tdListNew
(
POINTER_BYTES
);
if
(
pPBuf
->
lruList
==
NULL
)
{
goto
_error
;
}
// init id hash table
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
);
pPBuf
->
pIdList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
pPBuf
->
pIdList
==
NULL
)
{
goto
_error
;
}
pPBuf
->
assistBuf
=
taosMemoryMalloc
(
pPBuf
->
pageSize
+
2
);
// EXTRA BYTES
if
(
pPBuf
->
assistBuf
==
NULL
)
{
goto
_error
;
}
pPBuf
->
all
=
taosHashInit
(
10
,
fn
,
true
,
false
);
pPBuf
->
prefix
=
(
char
*
)
dir
;
if
(
pPBuf
->
all
==
NULL
)
{
goto
_error
;
}
pPBuf
->
prefix
=
(
char
*
)
dir
;
pPBuf
->
emptyDummyIdList
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
// pPBuf->pageSize,
// pPBuf->inMemPages, pPBuf->path);
// pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
return
TSDB_CODE_SUCCESS
;
_error:
destroyDiskbasedBuf
(
pPBuf
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
static
char
*
doExtractPage
(
SDiskbasedBuf
*
pBuf
)
{
char
*
availablePage
=
NULL
;
if
(
NO_IN_MEM_AVAILABLE_PAGES
(
pBuf
))
{
availablePage
=
evictBufPage
(
pBuf
);
if
(
availablePage
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
uWarn
(
"no available buf pages, current:%d, max:%d"
,
listNEles
(
pBuf
->
lruList
),
pBuf
->
inMemPages
)
}
}
else
{
availablePage
=
taosMemoryCalloc
(
1
,
getAllocPageSize
(
pBuf
->
pageSize
));
// add extract bytes in case of zipped buffer increased.
if
(
availablePage
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
return
availablePage
;
}
void
*
getNewBufPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
*
pageId
)
{
pBuf
->
statis
.
getPages
+=
1
;
char
*
availablePage
=
NULL
;
if
(
NO_IN_MEM_AVAILABLE_PAGES
(
pBuf
)
)
{
availablePage
=
evacOneDataPage
(
pBuf
)
;
char
*
availablePage
=
doExtractPage
(
pBuf
)
;
if
(
availablePage
==
NULL
)
{
return
NULL
;
}
SPageInfo
*
pi
=
NULL
;
...
...
@@ -394,7 +432,10 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
*
pageId
=
(
++
pBuf
->
allocateId
);
// register page id info
pi
=
registerPage
(
pBuf
,
*
pageId
);
pi
=
registerNewPageInfo
(
pBuf
,
*
pageId
);
if
(
pi
==
NULL
)
{
return
NULL
;
}
// add to hash map
taosHashPut
(
pBuf
->
all
,
pageId
,
sizeof
(
int32_t
),
&
pi
,
POINTER_BYTES
);
...
...
@@ -402,63 +443,62 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
}
// add to LRU list
ASSERT
(
listNEles
(
pBuf
->
lruList
)
<
pBuf
->
inMemPages
&&
pBuf
->
inMemPages
>
0
);
lruListPushFront
(
pBuf
->
lruList
,
pi
);
// allocate buf
if
(
availablePage
==
NULL
)
{
pi
->
pData
=
taosMemoryCalloc
(
1
,
getAllocPageSize
(
pBuf
->
pageSize
));
// add extract bytes in case of zipped buffer increased.
}
else
{
pi
->
pData
=
availablePage
;
}
pi
->
pData
=
availablePage
;
((
void
**
)
pi
->
pData
)[
0
]
=
pi
;
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%"
PRId64
,
pi
->
pData
,
pi
->
pageId
,
pi
->
offset
);
#endif
return
(
void
*
)(
GET_DATA_PAYLOAD
(
pi
));
return
(
void
*
)(
GET_PAYLOAD_DATA
(
pi
));
}
void
*
getBufPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
id
)
{
ASSERT
(
pBuf
!=
NULL
&&
id
>=
0
);
if
(
id
<
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
uError
(
"invalid page id:%d, %s"
,
id
,
pBuf
->
id
);
return
NULL
;
}
pBuf
->
statis
.
getPages
+=
1
;
SPageInfo
**
pi
=
taosHashGet
(
pBuf
->
all
,
&
id
,
sizeof
(
int32_t
));
ASSERT
(
pi
!=
NULL
&&
*
pi
!=
NULL
);
if
(
pi
==
NULL
||
*
pi
==
NULL
)
{
uError
(
"failed to locate the buffer page:%d, %s"
,
id
,
pBuf
->
id
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
if
(
(
*
pi
)
->
pData
!=
NULL
)
{
// it is in memory
if
(
BUF_PAGE_IN_MEM
(
*
pi
)
)
{
// it is in memory
// no need to update the LRU list if only one page exists
if
(
pBuf
->
numOfPages
==
1
)
{
(
*
pi
)
->
used
=
true
;
return
(
void
*
)(
GET_
DATA_PAYLOAD
(
*
pi
));
return
(
void
*
)(
GET_
PAYLOAD_DATA
(
*
pi
));
}
SPageInfo
**
pInfo
=
(
SPageInfo
**
)((
*
pi
)
->
pn
->
data
);
ASSERT
(
*
pInfo
==
*
pi
);
if
(
*
pInfo
!=
*
pi
)
{
uError
(
"inconsistently data in paged buffer, pInfo:%p, pi:%p, %s"
,
*
pInfo
,
*
pi
,
pBuf
->
id
);
return
NULL
;
}
lruListMoveToFront
(
pBuf
->
lruList
,
(
*
pi
));
(
*
pi
)
->
used
=
true
;
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_getBufPage1 pageId:%d, offset:%"
PRId64
,
(
*
pi
)
->
pageId
,
(
*
pi
)
->
offset
);
#endif
return
(
void
*
)(
GET_
DATA_PAYLOAD
(
*
pi
));
return
(
void
*
)(
GET_
PAYLOAD_DATA
(
*
pi
));
}
else
{
// not in memory
ASSERT
((
*
pi
)
->
pData
==
NULL
&&
(
*
pi
)
->
pn
==
NULL
&&
ASSERT
((
!
BUF_PAGE_IN_MEM
(
*
pi
))
&&
(
*
pi
)
->
pn
==
NULL
&&
(((
*
pi
)
->
length
>=
0
&&
(
*
pi
)
->
offset
>=
0
)
||
((
*
pi
)
->
length
==
-
1
&&
(
*
pi
)
->
offset
==
-
1
)));
char
*
availablePage
=
NULL
;
if
(
NO_IN_MEM_AVAILABLE_PAGES
(
pBuf
))
{
availablePage
=
evacOneDataPage
(
pBuf
);
if
(
availablePage
==
NULL
)
{
return
NULL
;
}
}
(
*
pi
)
->
pData
=
doExtractPage
(
pBuf
);
if
(
availablePage
==
NULL
)
{
(
*
pi
)
->
pData
=
taosMemoryCalloc
(
1
,
getAllocPageSize
(
pBuf
->
pageSize
));
}
else
{
(
*
pi
)
->
pData
=
availablePage
;
// failed to evict buffer page, return with error code.
if
((
*
pi
)
->
pData
==
NULL
)
{
return
NULL
;
}
// set the ptr to the new SPageInfo
...
...
@@ -468,23 +508,25 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
(
*
pi
)
->
used
=
true
;
// some data has been flushed to disk, and needs to be loaded into buffer again.
if
(
(
*
pi
)
->
length
>
0
&&
(
*
pi
)
->
offset
>=
0
)
{
if
(
HAS_DATA_IN_DISK
(
*
pi
)
)
{
int32_t
code
=
loadPageFromDisk
(
pBuf
,
*
pi
);
if
(
code
!=
0
)
{
terrno
=
code
;
return
NULL
;
}
}
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_getBufPage2 pageId:%d, offset:%"
PRId64
,
(
*
pi
)
->
pageId
,
(
*
pi
)
->
offset
);
#endif
return
(
void
*
)(
GET_
DATA_PAYLOAD
(
*
pi
));
return
(
void
*
)(
GET_
PAYLOAD_DATA
(
*
pi
));
}
}
void
releaseBufPage
(
SDiskbasedBuf
*
pBuf
,
void
*
page
)
{
if
(
ASSERTS
(
pBuf
!=
NULL
&&
page
!=
NULL
,
"pBuf or page is NULL"
)
)
{
if
(
page
==
NULL
)
{
return
;
}
SPageInfo
*
ppi
=
getPageInfoFromPayload
(
page
);
releaseBufPageInfo
(
pBuf
,
ppi
);
}
...
...
@@ -493,7 +535,13 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_releaseBufPageInfo pageId:%d, used:%d, offset:%"
PRId64
,
pi
->
pageId
,
pi
->
used
,
pi
->
offset
);
#endif
if
(
ASSERTS
(
pi
->
pData
!=
NULL
,
"pi->pData is NULL"
))
{
if
(
pi
==
NULL
)
{
return
;
}
if
(
pi
->
pData
==
NULL
)
{
uError
(
"pi->pData (page data) is null"
);
return
;
}
...
...
@@ -504,7 +552,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
size_t
getTotalBufSize
(
const
SDiskbasedBuf
*
pBuf
)
{
return
(
size_t
)
pBuf
->
totalBufSize
;
}
SArray
*
getDataBufPagesIdList
(
SDiskbasedBuf
*
pBuf
)
{
ASSERT
(
pBuf
!=
NULL
);
return
pBuf
->
pIdList
;
}
...
...
@@ -582,7 +629,6 @@ SPageInfo* getLastPageInfo(SArray* pList) {
}
int32_t
getPageId
(
const
SPageInfo
*
pPgInfo
)
{
ASSERT
(
pPgInfo
!=
NULL
);
return
pPgInfo
->
pageId
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录