Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2889b8d9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2889b8d9
编写于
8月 23, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(query): do some internal refactor.
上级
821a12a0
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
34 addition
and
19 deletion
+34
-19
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+18
-12
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-0
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+15
-7
未找到文件。
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
2889b8d9
...
...
@@ -178,7 +178,7 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR
static
void
setComposedBlockFlag
(
STsdbReader
*
pReader
,
bool
composed
);
static
bool
hasBeenDropped
(
const
SArray
*
pDelList
,
int32_t
*
index
,
TSDBKEY
*
pKey
,
int32_t
order
);
static
void
doMergeMultiRows
(
TSDBROW
*
pRow
,
uint64_t
uid
,
SIterInfo
*
pIter
,
SArray
*
pDelList
,
STSRow
**
pTSRow
,
static
void
doMergeM
emTableM
ultiRows
(
TSDBROW
*
pRow
,
uint64_t
uid
,
SIterInfo
*
pIter
,
SArray
*
pDelList
,
STSRow
**
pTSRow
,
STsdbReader
*
pReader
,
bool
*
freeTSRow
);
static
void
doMergeMemIMemRows
(
TSDBROW
*
pRow
,
TSDBROW
*
piRow
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
STSRow
**
pTSRow
);
...
...
@@ -1510,6 +1510,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
return
TSDB_CODE_SUCCESS
;
}
#if 0
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
...
...
@@ -1536,7 +1537,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
freeTSRow = true;
}
} else if (k.ts < key) { // k.ts < key
doMergeMultiRows
(
pRow
,
pBlockScanInfo
->
uid
,
pIter
,
pDelList
,
&
pTSRow
,
pReader
,
&
freeTSRow
);
doMergeM
emTableM
ultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
...
...
@@ -1549,7 +1550,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
} else { // descending order scan
if (key < k.ts) {
doMergeMultiRows
(
pRow
,
pBlockScanInfo
->
uid
,
pIter
,
pDelList
,
&
pTSRow
,
pReader
,
&
freeTSRow
);
doMergeM
emTableM
ultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
} else if (k.ts < key) {
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS;
...
...
@@ -1583,6 +1584,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS;
}
#endif
static
int32_t
doMergeMultiLevelRowsRv
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
,
SLastBlockReader
*
pLastBlockReader
)
{
SRowMerger
merge
=
{
0
};
STSRow
*
pTSRow
=
NULL
;
...
...
@@ -1734,6 +1737,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo
return
TSDB_CODE_SUCCESS
;
}
#if 0
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
...
...
@@ -1779,7 +1783,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [3] ik.ts < key <= k.ts
// [4] ik.ts < k.ts <= key
if (ik.ts < k.ts) {
doMergeMultiRows
(
piRow
,
uid
,
&
pBlockScanInfo
->
iiter
,
pDelList
,
&
pTSRow
,
pReader
,
&
freeTSRow
);
doMergeM
emTableM
ultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
...
...
@@ -1790,7 +1794,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [5] k.ts < key <= ik.ts
// [6] k.ts < ik.ts <= key
if (k.ts < ik.ts) {
doMergeMultiRows
(
pRow
,
uid
,
&
pBlockScanInfo
->
iter
,
pDelList
,
&
pTSRow
,
pReader
,
&
freeTSRow
);
doMergeM
emTableM
ultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
...
...
@@ -1836,7 +1840,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [3] ik.ts > k.ts >= Key
// [4] ik.ts > key >= k.ts
if (ik.ts > key) {
doMergeMultiRows
(
piRow
,
uid
,
&
pBlockScanInfo
->
iiter
,
pDelList
,
&
pTSRow
,
pReader
,
&
freeTSRow
);
doMergeM
emTableM
ultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
...
...
@@ -1859,7 +1863,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
//[7] key = ik.ts > k.ts
if (key == ik.ts) {
doMergeMultiRows
(
piRow
,
uid
,
&
pBlockScanInfo
->
iiter
,
pDelList
,
&
pTSRow
,
pReader
,
&
freeTSRow
);
doMergeM
emTableM
ultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMerge(&merge, &fRow);
...
...
@@ -1876,6 +1880,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
ASSERT(0);
return -1;
}
#endif
static
bool
isValidFileBlockRow
(
SBlockData
*
pBlockData
,
SFileBlockDumpInfo
*
pDumpInfo
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
)
{
...
...
@@ -3115,7 +3120,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
return
TSDB_CODE_SUCCESS
;
}
void
doMergeMultiRows
(
TSDBROW
*
pRow
,
uint64_t
uid
,
SIterInfo
*
pIter
,
SArray
*
pDelList
,
STSRow
**
pTSRow
,
void
doMergeM
emTableM
ultiRows
(
TSDBROW
*
pRow
,
uint64_t
uid
,
SIterInfo
*
pIter
,
SArray
*
pDelList
,
STSRow
**
pTSRow
,
STsdbReader
*
pReader
,
bool
*
freeTSRow
)
{
TSDBROW
*
pNextRow
=
NULL
;
TSDBROW
current
=
*
pRow
;
...
...
@@ -3197,6 +3202,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
TSDBROW
*
pRow
=
getValidRow
(
&
pBlockScanInfo
->
iter
,
pBlockScanInfo
->
delSkyline
,
pReader
);
TSDBROW
*
piRow
=
getValidRow
(
&
pBlockScanInfo
->
iiter
,
pBlockScanInfo
->
delSkyline
,
pReader
);
SArray
*
pDelList
=
pBlockScanInfo
->
delSkyline
;
uint64_t
uid
=
pBlockScanInfo
->
uid
;
// todo refactor
bool
asc
=
ASCENDING_TRAVERSE
(
pReader
->
order
);
...
...
@@ -3219,9 +3225,9 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
TSDBKEY
ik
=
TSDBROW_KEY
(
piRow
);
if
(
ik
.
ts
<
k
.
ts
)
{
// ik.ts < k.ts
doMergeM
ultiRows
(
piRow
,
pBlockScanInfo
->
uid
,
&
pBlockScanInfo
->
iiter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
doMergeM
emTableMultiRows
(
piRow
,
uid
,
&
pBlockScanInfo
->
iiter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
}
else
if
(
k
.
ts
<
ik
.
ts
)
{
doMergeM
ultiRows
(
pRow
,
pBlockScanInfo
->
uid
,
&
pBlockScanInfo
->
iter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
doMergeM
emTableMultiRows
(
pRow
,
uid
,
&
pBlockScanInfo
->
iter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
}
else
{
// ik.ts == k.ts
doMergeMemIMemRows
(
pRow
,
piRow
,
pBlockScanInfo
,
pReader
,
pTSRow
);
*
freeTSRow
=
true
;
...
...
@@ -3231,12 +3237,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
}
if
(
pBlockScanInfo
->
iter
.
hasVal
&&
pRow
!=
NULL
)
{
doMergeMultiRows
(
pRow
,
pBlockScanInfo
->
uid
,
&
pBlockScanInfo
->
iter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
doMergeM
emTableM
ultiRows
(
pRow
,
pBlockScanInfo
->
uid
,
&
pBlockScanInfo
->
iter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
return
TSDB_CODE_SUCCESS
;
}
if
(
pBlockScanInfo
->
iiter
.
hasVal
&&
piRow
!=
NULL
)
{
doMergeM
ultiRows
(
piRow
,
pBlockScanInfo
->
uid
,
&
pBlockScanInfo
->
iiter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
doMergeM
emTableMultiRows
(
piRow
,
uid
,
&
pBlockScanInfo
->
iiter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
2889b8d9
...
...
@@ -3487,6 +3487,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
qError
(
"Init stream agg supporter failed since %s"
,
terrstr
(
terrno
));
return
terrno
;
}
int32_t
code
=
createDiskbasedBuf
(
&
pAggSup
->
pResultBuf
,
defaultPgsz
,
defaultBufsz
,
pKey
,
tsTempDir
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"Create agg result buf failed since %s"
,
tstrerror
(
code
));
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
2889b8d9
...
...
@@ -50,9 +50,11 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SProjectPhysiNode
*
pProjPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SProjectOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SProjectOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
...
...
@@ -67,12 +69,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFilterNode
=
pProjPhyNode
->
node
.
pConditions
;
pInfo
->
mergeDataBlocks
=
pProjPhyNode
->
mergeDataBlock
;
// todo remove it soon
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
)
{
pInfo
->
mergeDataBlocks
=
false
;
}
else
{
pInfo
->
mergeDataBlocks
=
pProjPhyNode
->
mergeDataBlock
;
}
int32_t
numOfRows
=
4096
;
...
...
@@ -83,9 +84,13 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
if
(
numOfRows
*
pResBlock
->
info
.
rowSize
>
TWOMB
)
{
numOfRows
=
TWOMB
/
pResBlock
->
info
.
rowSize
;
}
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
numOfRows
);
code
=
initAggInfo
(
&
pOperator
->
exprSupp
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
initAggInfo
(
&
pOperator
->
exprSupp
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
initBasicInfo
(
&
pInfo
->
binfo
,
pResBlock
);
setFunctionResultOutput
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
MAIN_SCAN
,
numOfCols
);
...
...
@@ -99,7 +104,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doProjectOperation
,
NULL
,
NULL
,
destroyProjectOperatorInfo
,
NULL
,
NULL
,
NULL
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -107,7 +112,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
return
pOperator
;
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
destroyProjectOperatorInfo
(
pInfo
,
numOfCols
);
taosMemoryFree
(
pOperator
);
pTaskInfo
->
code
=
code
;
return
NULL
;
}
...
...
@@ -175,7 +182,8 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
//TODO: optimize it later when partition by + limit
if
((
pLimitInfo
->
slimit
.
limit
==
-
1
&&
pLimitInfo
->
currentGroupId
==
0
)
||
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
if
((
pLimitInfo
->
slimit
.
limit
==
-
1
&&
pLimitInfo
->
currentGroupId
==
0
)
||
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
doSetOperatorCompleted
(
pOperator
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录