Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
20b06a7e
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
20b06a7e
编写于
8月 23, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(query): do some internal refactor.
上级
2889b8d9
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
100 addition
and
190 deletion
+100
-190
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+100
-170
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+0
-2
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+0
-14
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+0
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+0
-3
未找到文件。
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
20b06a7e
...
@@ -182,6 +182,8 @@ static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIt
...
@@ -182,6 +182,8 @@ static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIt
STsdbReader
*
pReader
,
bool
*
freeTSRow
);
STsdbReader
*
pReader
,
bool
*
freeTSRow
);
static
void
doMergeMemIMemRows
(
TSDBROW
*
pRow
,
TSDBROW
*
piRow
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
static
void
doMergeMemIMemRows
(
TSDBROW
*
pRow
,
TSDBROW
*
piRow
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
STSRow
**
pTSRow
);
STSRow
**
pTSRow
);
static
int32_t
mergeRowsInFileBlocks
(
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pBlockScanInfo
,
int64_t
key
,
STsdbReader
*
pReader
);
static
int32_t
initDelSkylineIterator
(
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
STbData
*
pMemTbData
,
static
int32_t
initDelSkylineIterator
(
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
STbData
*
pMemTbData
,
STbData
*
piMemTbData
);
STbData
*
piMemTbData
);
static
STsdb
*
getTsdbByRetentions
(
SVnode
*
pVnode
,
TSKEY
winSKey
,
SRetention
*
retentions
,
const
char
*
idstr
,
static
STsdb
*
getTsdbByRetentions
(
SVnode
*
pVnode
,
TSKEY
winSKey
,
SRetention
*
retentions
,
const
char
*
idstr
,
...
@@ -1510,82 +1512,83 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
...
@@ -1510,82 +1512,83 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
#if 0
static
int32_t
mergeFileBlockAndLastBlock
(
STsdbReader
*
pReader
,
SLastBlockReader
*
pLastBlockReader
,
int64_t
key
,
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
)
{
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
TSDBKEY k = TSDBROW_KEY(pRow);
if
(
pBlockData
->
nRow
>
0
)
{
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
// no last block available, only data block exists
SArray* pDelList = pBlockScanInfo->delSkyline;
if
(
pLastBlockReader
->
lastBlockData
.
nRow
==
0
||
(
!
hasDataInLastBlock
(
pLastBlockReader
)))
{
bool freeTSRow = false;
return
mergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
key
,
pReader
);
uint64_t uid = pBlockScanInfo->uid;
}
// row in last file block
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
int64_t
ts
=
getCurrentKeyInLastBlock
(
pLastBlockReader
);
ASSERT
(
ts
>=
key
);
if
(
ASCENDING_TRAVERSE
(
pReader
->
order
))
{
if
(
key
<
ts
)
{
// imem, mem are all empty, file blocks (data blocks and last block) exist
return
mergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
key
,
pReader
);
}
else
if
(
key
==
ts
)
{
STSRow
*
pTSRow
=
NULL
;
SRowMerger
merge
=
{
0
};
// ascending order traverse
if (ASCENDING_TRAVERSE(pReader->order)) {
if (key < k.ts) {
// imem & mem are all empty, only file exist
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS;
} else {
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
tRowMergerGetRow(&merge, &pTSRow);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
ts
,
&
merge
);
freeTSRow = true;
}
} else if (k.ts < key) { // k.ts < key
doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMerge(&merge, p
Row);
tRowMergerGetRow
(
&
merge
,
&
pTS
Row
);
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
tRowMergerGetRow(&merge, &pTSRow);
taosMemoryFree
(
pTSRow
);
freeTSRow = true;
tRowMergerClear
(
&
merge
);
}
} else { // descending order scan
if (key < k.ts) {
doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
} else if (k.ts < key) {
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
else
{
}
else
{
tRowMergerInit(&merge, &fRow, pReader->pSchema);
ASSERT
(
0
);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
return
TSDB_CODE_SUCCESS
;
tRowMergerGetRow(&merge, &pTSRow);
freeTSRow = true;
}
}
} else { // descending order: mem rows -----> imem rows ------> file block
}
else
{
// desc order
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
SBlockData
*
pLastBlockData
=
&
pLastBlockReader
->
lastBlockData
;
TSDBROW
fRow1
=
tsdbRowFromBlockData
(
pLastBlockData
,
*
pLastBlockReader
->
rowIndex
);
tRowMergerInit(&merge, pRow, pSchema);
STSRow
*
pTSRow
=
NULL
;
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
SRowMerger
merge
=
{
0
};
tRowMergerInit
(
&
merge
,
&
fRow1
,
pReader
->
pSchema
);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
ts
,
&
merge
);
tRowMerge(&merge, &fRow);
if
(
ts
==
key
)
{
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
}
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
freeTSRow = true;
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
return
TSDB_CODE_SUCCESS
;
}
}
}
}
else
{
// only last block exists
SBlockData
*
pLastBlockData
=
&
pLastBlockReader
->
lastBlockData
;
int64_t
tsLastBlock
=
getCurrentKeyInLastBlock
(
pLastBlockReader
);
tRowMergerClear(&merge);
STSRow
*
pTSRow
=
NULL
;
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
SRowMerger
merge
=
{
0
};
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pLastBlockData
,
*
pLastBlockReader
->
rowIndex
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
tsLastBlock
,
&
merge
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
if (freeTSRow) {
taosMemoryFree
(
pTSRow
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
return
TSDB_CODE_SUCCESS
;
}
}
return TSDB_CODE_SUCCESS;
}
}
#endif
static
int32_t
doMergeMultiLevelRowsRv
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
,
SLastBlockReader
*
pLastBlockReader
)
{
static
int32_t
doMergeMultiLevelRowsRv
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
,
SLastBlockReader
*
pLastBlockReader
)
{
SRowMerger
merge
=
{
0
};
SRowMerger
merge
=
{
0
};
STSRow
*
pTSRow
=
NULL
;
STSRow
*
pTSRow
=
NULL
;
...
@@ -1987,10 +1990,35 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
...
@@ -1987,10 +1990,35 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
if
(
*
pLastBlockReader
->
rowIndex
==
ALL_ROWS_CHECKED_INDEX
)
{
if
(
*
pLastBlockReader
->
rowIndex
==
ALL_ROWS_CHECKED_INDEX
)
{
return
false
;
return
false
;
}
}
ASSERT
(
pLastBlockReader
->
lastBlockData
.
nRow
>
0
);
return
true
;
return
true
;
}
}
// todo refactor
int32_t
mergeRowsInFileBlocks
(
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pBlockScanInfo
,
int64_t
key
,
STsdbReader
*
pReader
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
if
(
tryCopyDistinctRowFromFileBlock
(
pReader
,
pBlockData
,
key
,
pDumpInfo
))
{
return
TSDB_CODE_SUCCESS
;
}
else
{
STSRow
*
pTSRow
=
NULL
;
SRowMerger
merge
=
{
0
};
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
return
TSDB_CODE_SUCCESS
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildComposedDataBlockImpl
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
static
int32_t
buildComposedDataBlockImpl
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
,
SLastBlockReader
*
pLastBlockReader
)
{
SBlockData
*
pBlockData
,
SLastBlockReader
*
pLastBlockReader
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
...
@@ -2007,112 +2035,13 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
...
@@ -2007,112 +2035,13 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
return
doMergeBufAndFileRows_Rv
(
pReader
,
pBlockScanInfo
,
piRow
,
&
pBlockScanInfo
->
iiter
,
key
,
pLastBlockReader
);
return
doMergeBufAndFileRows_Rv
(
pReader
,
pBlockScanInfo
,
piRow
,
&
pBlockScanInfo
->
iiter
,
key
,
pLastBlockReader
);
}
}
// mem + file
// mem + file
+ last block
if
(
pBlockScanInfo
->
iter
.
hasVal
)
{
if
(
pBlockScanInfo
->
iter
.
hasVal
)
{
return
doMergeBufAndFileRows_Rv
(
pReader
,
pBlockScanInfo
,
pRow
,
&
pBlockScanInfo
->
iter
,
key
,
pLastBlockReader
);
return
doMergeBufAndFileRows_Rv
(
pReader
,
pBlockScanInfo
,
pRow
,
&
pBlockScanInfo
->
iter
,
key
,
pLastBlockReader
);
}
}
if
(
pBlockData
->
nRow
>
0
)
{
// files data blocks + last block
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
return
mergeFileBlockAndLastBlock
(
pReader
,
pLastBlockReader
,
key
,
pBlockScanInfo
,
pBlockData
);
// no last block available, only data block exists
if
(
pLastBlockReader
->
lastBlockData
.
nRow
==
0
||
(
!
hasDataInLastBlock
(
pLastBlockReader
)))
{
if
(
tryCopyDistinctRowFromFileBlock
(
pReader
,
pBlockData
,
key
,
pDumpInfo
))
{
return
TSDB_CODE_SUCCESS
;
}
else
{
STSRow
*
pTSRow
=
NULL
;
SRowMerger
merge
=
{
0
};
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
return
TSDB_CODE_SUCCESS
;
}
}
// row in last file block
int64_t
ts
=
getCurrentKeyInLastBlock
(
pLastBlockReader
);
ASSERT
(
ts
>=
key
);
if
(
ASCENDING_TRAVERSE
(
pReader
->
order
))
{
if
(
key
<
ts
)
{
// imem & mem are all empty, only file exist
if
(
tryCopyDistinctRowFromFileBlock
(
pReader
,
pBlockData
,
key
,
pDumpInfo
))
{
return
TSDB_CODE_SUCCESS
;
}
else
{
STSRow
*
pTSRow
=
NULL
;
SRowMerger
merge
=
{
0
};
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
return
TSDB_CODE_SUCCESS
;
}
}
else
if
(
key
==
ts
)
{
STSRow
*
pTSRow
=
NULL
;
SRowMerger
merge
=
{
0
};
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
ts
,
&
merge
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
return
TSDB_CODE_SUCCESS
;
}
else
{
ASSERT
(
0
);
return
TSDB_CODE_SUCCESS
;
}
}
else
{
// desc order
SBlockData
*
pLastBlockData
=
&
pLastBlockReader
->
lastBlockData
;
TSDBROW
fRow1
=
tsdbRowFromBlockData
(
pLastBlockData
,
*
pLastBlockReader
->
rowIndex
);
STSRow
*
pTSRow
=
NULL
;
SRowMerger
merge
=
{
0
};
tRowMergerInit
(
&
merge
,
&
fRow1
,
pReader
->
pSchema
);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
ts
,
&
merge
);
if
(
ts
==
key
)
{
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
}
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
return
TSDB_CODE_SUCCESS
;
}
}
else
{
// only last block exists
SBlockData
*
pLastBlockData
=
&
pLastBlockReader
->
lastBlockData
;
int64_t
tsLastBlock
=
getCurrentKeyInLastBlock
(
pLastBlockReader
);
STSRow
*
pTSRow
=
NULL
;
SRowMerger
merge
=
{
0
};
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pLastBlockData
,
*
pLastBlockReader
->
rowIndex
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
tsLastBlock
,
&
merge
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
return
TSDB_CODE_SUCCESS
;
}
}
}
}
}
...
@@ -2137,9 +2066,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
...
@@ -2137,9 +2066,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
while
(
1
)
{
while
(
1
)
{
// todo check the validate of row in file block
// todo check the validate of row in file block
bool
hasBlockData
=
false
;
{
{
bool
hasBlockData
=
false
;
while
(
pBlockData
->
nRow
>
0
)
{
// find the first qualified row in data block
while
(
pBlockData
->
nRow
>
0
)
{
// find the first qualified row in data block
if
(
isValidFileBlockRow
(
pBlockData
,
pDumpInfo
,
pBlockScanInfo
,
pReader
))
{
if
(
isValidFileBlockRow
(
pBlockData
,
pDumpInfo
,
pBlockScanInfo
,
pReader
))
{
hasBlockData
=
true
;
hasBlockData
=
true
;
...
@@ -2154,13 +2082,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
...
@@ -2154,13 +2082,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
break
;
break
;
}
}
}
}
}
bool
hasBlockLData
=
hasDataInLastBlock
(
pLastBlockReader
);
bool
hasBlockLData
=
hasDataInLastBlock
(
pLastBlockReader
);
// no data in last block and block, no need to proceed.
if
((
hasBlockData
==
false
)
&&
(
hasBlockLData
==
false
))
{
// no data in last block and block, no need to proceed.
break
;
if
((
hasBlockData
==
false
)
&&
(
hasBlockLData
==
false
))
{
break
;
}
}
}
buildComposedDataBlockImpl
(
pReader
,
pBlockScanInfo
,
pBlockData
,
pLastBlockReader
);
buildComposedDataBlockImpl
(
pReader
,
pBlockScanInfo
,
pBlockData
,
pLastBlockReader
);
...
@@ -3224,10 +3152,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
...
@@ -3224,10 +3152,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
TSDBKEY
ik
=
TSDBROW_KEY
(
piRow
);
TSDBKEY
ik
=
TSDBROW_KEY
(
piRow
);
if
(
ik
.
ts
<
k
.
ts
)
{
// ik.ts < k.ts
if
(
ik
.
ts
!=
k
.
ts
)
{
doMergeMemTableMultiRows
(
piRow
,
uid
,
&
pBlockScanInfo
->
iiter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
if
(((
ik
.
ts
<
k
.
ts
)
&&
asc
)
||
((
ik
.
ts
>
k
.
ts
)
&&
(
!
asc
)))
{
// ik.ts < k.ts
}
else
if
(
k
.
ts
<
ik
.
ts
)
{
doMergeMemTableMultiRows
(
piRow
,
uid
,
&
pBlockScanInfo
->
iiter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
doMergeMemTableMultiRows
(
pRow
,
uid
,
&
pBlockScanInfo
->
iter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
}
else
if
(((
k
.
ts
<
ik
.
ts
)
&&
asc
)
||
((
k
.
ts
>
ik
.
ts
)
&&
(
!
asc
)))
{
doMergeMemTableMultiRows
(
pRow
,
uid
,
&
pBlockScanInfo
->
iter
,
pDelList
,
pTSRow
,
pReader
,
freeTSRow
);
}
}
else
{
// ik.ts == k.ts
}
else
{
// ik.ts == k.ts
doMergeMemIMemRows
(
pRow
,
piRow
,
pBlockScanInfo
,
pReader
,
pTSRow
);
doMergeMemIMemRows
(
pRow
,
piRow
,
pBlockScanInfo
,
pReader
,
pTSRow
);
*
freeTSRow
=
true
;
*
freeTSRow
=
true
;
...
...
source/libs/executor/inc/executil.h
浏览文件 @
20b06a7e
...
@@ -80,11 +80,9 @@ struct SqlFunctionCtx;
...
@@ -80,11 +80,9 @@ struct SqlFunctionCtx;
size_t
getResultRowSize
(
struct
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
size_t
getResultRowSize
(
struct
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
initResultRow
(
SResultRow
*
pResultRow
);
void
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
bool
isResultRowClosed
(
SResultRow
*
pResultRow
);
struct
SResultRowEntryInfo
*
getResultEntryInfo
(
const
SResultRow
*
pRow
,
int32_t
index
,
const
int32_t
*
offset
);
struct
SResultRowEntryInfo
*
getResultEntryInfo
(
const
SResultRow
*
pRow
,
int32_t
index
,
const
int32_t
*
offset
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
20b06a7e
...
@@ -31,20 +31,6 @@ void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
...
@@ -31,20 +31,6 @@ void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
pResultRowInfo
->
cur
.
pageId
=
-
1
;
pResultRowInfo
->
cur
.
pageId
=
-
1
;
}
}
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
)
{
if
(
pResultRowInfo
==
NULL
)
{
return
;
}
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
// if (pResultRowInfo->pResult[i]) {
// taosMemoryFreeClear(pResultRowInfo->pResult[i]->key);
// }
}
}
bool
isResultRowClosed
(
SResultRow
*
pRow
)
{
return
(
pRow
->
closed
==
true
);
}
void
closeResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
closed
=
true
;
}
void
closeResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
closed
=
true
;
}
// TODO refactor: use macro
// TODO refactor: use macro
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
20b06a7e
...
@@ -3635,7 +3635,6 @@ _error:
...
@@ -3635,7 +3635,6 @@ _error:
void
cleanupBasicInfo
(
SOptrBasicInfo
*
pInfo
)
{
void
cleanupBasicInfo
(
SOptrBasicInfo
*
pInfo
)
{
assert
(
pInfo
!=
NULL
);
assert
(
pInfo
!=
NULL
);
cleanupResultRowInfo
(
&
pInfo
->
resultRowInfo
);
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
}
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
20b06a7e
...
@@ -2965,7 +2965,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
...
@@ -2965,7 +2965,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
static
void
clearStreamIntervalOperator
(
SStreamFinalIntervalOperatorInfo
*
pInfo
)
{
static
void
clearStreamIntervalOperator
(
SStreamFinalIntervalOperatorInfo
*
pInfo
)
{
taosHashClear
(
pInfo
->
aggSup
.
pResultRowHashTable
);
taosHashClear
(
pInfo
->
aggSup
.
pResultRowHashTable
);
clearDiskbasedBuf
(
pInfo
->
aggSup
.
pResultBuf
);
clearDiskbasedBuf
(
pInfo
->
aggSup
.
pResultBuf
);
cleanupResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
}
}
...
@@ -4253,8 +4252,6 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
...
@@ -4253,8 +4252,6 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
}
}
}
}
clearDiskbasedBuf
(
pInfo
->
streamAggSup
.
pResultBuf
);
clearDiskbasedBuf
(
pInfo
->
streamAggSup
.
pResultBuf
);
cleanupResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
}
}
static
void
removeSessionResults
(
SHashObj
*
pHashMap
,
SArray
*
pWins
)
{
static
void
removeSessionResults
(
SHashObj
*
pHashMap
,
SArray
*
pWins
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录