Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
279b59f8
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
279b59f8
编写于
7月 02, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feat/tsdb_refact' of
https://github.com/taosdata/TDengine
into feat/tsdb_refact
上级
1acbb50f
a0e38fc7
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
60 addition
and
227 deletion
+60
-227
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+60
-227
未找到文件。
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
279b59f8
...
...
@@ -147,12 +147,11 @@ struct STsdbReader {
static
SFileDataBlockInfo
*
getCurrentBlockInfo
(
SDataBlockIter
*
pBlockIter
);
static
int
buildDataBlockFromBufImpl
(
STableBlockScanInfo
*
pBlockScanInfo
,
TSDBKEY
maxKey
,
int32_t
capacity
,
STsdbReader
*
pReader
);
static
TSDBROW
*
getValidRow
(
STbDataIter
*
pIter
,
bool
*
hasVal
,
STsdbReader
*
pReader
);
static
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SFileDataBlockInfo
*
pFBlock
,
SBlock
*
pBlock
,
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
,
SRowMerger
*
pMerger
);
static
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
,
SRowMerger
*
pMerger
);
static
int32_t
doLoadRowsOfIdenticalTsInBuf
(
STbDataIter
*
pIter
,
bool
*
hasVal
,
int64_t
ts
,
SRowMerger
*
pMerger
,
STsdbReader
*
pReader
);
static
int32_t
doAppendOneRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
);
static
void
setComposedBlockFlag
(
STsdbReader
*
pReader
,
bool
composed
);
static
void
checkU
pdateSchema
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STsdbReader
*
pReader
);
static
void
u
pdateSchema
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STsdbReader
*
pReader
);
static
void
doMergeMultiRows
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STbDataIter
*
dIter
,
bool
*
hasVal
,
STSRow
**
pTSRow
,
STsdbReader
*
pReader
);
static
void
doMergeMemIMemRows
(
TSDBROW
*
pRow
,
TSDBROW
*
piRow
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
STSRow
**
pTSRow
);
...
...
@@ -1069,64 +1068,6 @@ _error:
// return code;
// }
// static int32_t loadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableBlockScanInfo* pCheckInfo,
// bool* exists) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// int32_t code = TSDB_CODE_SUCCESS;
// bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
// if (asc) {
// // query ended in/started from current block
// if (pTsdbReadHandle->window.ekey < pBlock->maxKey.ts || pCheckInfo->lastKey > pBlock->minKey.ts) {
// if ((code = doLoadFileBlockData(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// *exists = false;
// return code;
// }
// SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
// assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
// if (pCheckInfo->lastKey > pBlock->minKey.ts) {
// cur->pos =
// binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey,
// pTsdbReadHandle->order);
// } else {
// cur->pos = 0;
// }
// assert(pCheckInfo->lastKey <= pBlock->maxKey.ts);
// doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
// } else { // the whole block is loaded in to buffer
// cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
// code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
// }
// } else { // desc order, query ended in current block
// if (pTsdbReadHandle->window.ekey > pBlock->minKey.ts || pCheckInfo->lastKey < pBlock->maxKey.ts) {
// if ((code = doLoadFileBlockData(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// *exists = false;
// return code;
// }
// SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
// if (pCheckInfo->lastKey < pBlock->maxKey.ts) {
// cur->pos =
// binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey,
// pTsdbReadHandle->order);
// } else {
// cur->pos = pBlock->numOfRows - 1;
// }
// assert(pCheckInfo->lastKey >= pBlock->minKey.ts);
// doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
// } else {
// cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
// code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
// }
// }
// *exists = pTsdbReadHandle->realNumOfRows > 0;
// return code;
// }
// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
// int firstPos, lastPos, midPos = -1;
...
...
@@ -1189,94 +1130,6 @@ _error:
// return midPos;
// }
// static int32_t doCopyRowsFromFileBlock(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t
// start,
// int32_t end) {
// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
// TSKEY* tsArray = pCols->cols[0].pData;
// int32_t num = end - start + 1;
// assert(num >= 0);
// if (num == 0) {
// return numOfRows;
// }
// bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
// int32_t trueStart = ascScan ? start : end;
// int32_t trueEnd = ascScan ? end : start;
// int32_t step = ascScan ? 1 : -1;
// int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
// // data in buffer has greater timestamp, copy data in file block
// int32_t i = 0, j = 0;
// while (i < requiredNumOfCols && j < pCols->numOfCols) {
// SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
// SDataCol* src = &pCols->cols[j];
// if (src->colId < pColInfo->info.colId) {
// j++;
// continue;
// }
// if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
// if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
// // memmove(pData, (char*)src->pData + bytes * start, bytes * num);
// int32_t rowIndex = numOfRows;
// for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex)
// {
// SCellVal sVal = {0};
// if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
// TASSERT(0);
// }
// if (sVal.valType == TD_VTYPE_NORM) {
// colDataAppend(pColInfo, rowIndex, sVal.val, false);
// } else {
// colDataAppendNULL(pColInfo, rowIndex);
// }
// }
// } else { // handle the var-string
// int32_t rowIndex = numOfRows;
// // todo refactor, only copy one-by-one
// for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex)
// {
// SCellVal sVal = {0};
// if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
// TASSERT(0);
// }
// if (sVal.valType == TD_VTYPE_NORM) {
// colDataAppend(pColInfo, rowIndex, sVal.val, false);
// } else {
// colDataAppendNULL(pColInfo, rowIndex);
// }
// }
// }
// j++;
// i++;
// } else { // pColInfo->info.colId < src->colId, it is a NULL data
// colDataAppendNNULL(pColInfo, numOfRows, num);
// i++;
// }
// }
// while (i < requiredNumOfCols) { // the remain columns are all null data
// SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
// colDataAppendNNULL(pColInfo, numOfRows, num);
// i++;
// }
// pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
// pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
// return numOfRows + num;
// }
// static int32_t mergeTwoRowFromMem(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
// STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema*
// pSchema2, bool update, TSKEY* lastRowKey) {
...
...
@@ -1472,42 +1325,6 @@ _error:
// #endif
// }
// static void getQualifiedRowsPos(STsdbReader* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted,
// int32_t* start, int32_t* end) {
// *start = -1;
// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
// int32_t remain = endPos - startPos + 1;
// if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
// *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
// } else {
// *end = endPos;
// }
// *start = startPos;
// } else {
// int32_t remain = (startPos - endPos) + 1;
// if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
// *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
// } else {
// *end = endPos;
// }
// *start = *end;
// *end = startPos;
// }
// }
// static void updateInfoAfterMerge(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, int32_t numOfRows,
// int32_t endPos) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// pCheckInfo->lastKey = cur->lastKey;
// pTsdbReadHandle->realNumOfRows = numOfRows;
// cur->rows = numOfRows;
// cur->pos = endPos;
// }
// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
...
...
@@ -2198,18 +2015,22 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
return
-
1
;
}
static
int32_t
setFileBlockActiveInBlockIter
(
SDataBlockIter
*
pBlockIter
,
int32_t
index
)
{
static
int32_t
setFileBlockActiveInBlockIter
(
SDataBlockIter
*
pBlockIter
,
int32_t
index
,
int32_t
step
)
{
if
(
index
<
0
||
index
>=
pBlockIter
->
numOfBlocks
)
{
return
-
1
;
}
SFileDataBlockInfo
fblock
=
*
(
SFileDataBlockInfo
*
)
taosArrayGet
(
pBlockIter
->
blockList
,
index
);
pBlockIter
->
index
+=
step
;
if
(
index
!=
pBlockIter
->
index
)
{
taosArrayRemove
(
pBlockIter
->
blockList
,
index
);
taosArrayInsert
(
pBlockIter
->
blockList
,
pBlockIter
->
index
,
&
fblock
);
SFileDataBlockInfo
*
pBlockInfo
=
taosArrayGet
(
pBlockIter
->
blockList
,
pBlockIter
->
index
);
ASSERT
(
pBlockInfo
->
uid
==
fblock
.
uid
&&
pBlockInfo
->
tbBlockIdx
==
fblock
.
tbBlockIdx
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2242,7 +2063,14 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBloc
overlapWithNeighbor
=
overlapWithNeighborBlock
(
pBlock
,
pNeighbor
,
pReader
->
order
);
}
return
(
overlapWithNeighbor
||
bool
hasDup
=
false
;
if
(
pBlock
->
nSubBlock
==
1
)
{
hasDup
=
pBlock
->
hasDup
;
}
else
{
hasDup
=
true
;
}
return
(
overlapWithNeighbor
||
hasDup
||
dataBlockPartialRequired
(
&
pReader
->
window
,
&
pReader
->
verRange
,
pBlock
)
||
keyOverlapFileBlock
(
key
,
pBlock
,
&
pReader
->
verRange
)
||
(
pBlock
->
nRow
>
pReader
->
capacity
));
...
...
@@ -2280,7 +2108,7 @@ static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBl
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInFileBlock
(
p
FBlock
,
pBlock
,
p
BlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
doLoadRowsOfIdenticalTsInFileBlock
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
if
(
k
.
ts
==
key
)
{
tRowMerge
(
&
merge
,
pRow
);
...
...
@@ -2296,10 +2124,13 @@ static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBl
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildComposedDataBlockImpl
(
STsdbReader
*
pReader
,
S
FileDataBlockInfo
*
pFBlock
,
SBlock
*
pBlock
,
S
TableBlockScanInfo
*
pBlockScanInfo
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
static
int32_t
buildComposedDataBlockImpl
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SBlockData
*
pBlockData
=
&
pReader
->
status
.
fileBlockData
;
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SBlock
*
pBlock
=
taosArrayGet
(
pBlockScanInfo
->
pBlockList
,
pFBlock
->
tbBlockIdx
);
SRowMerger
merge
=
{
0
};
STSRow
*
pTSRow
=
NULL
;
...
...
@@ -2316,7 +2147,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInFileBlock
(
p
FBlock
,
pBlock
,
p
BlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
doLoadRowsOfIdenticalTsInFileBlock
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
if
(
ik
.
ts
==
key
)
{
tRowMerge
(
&
merge
,
piRow
);
...
...
@@ -2370,7 +2201,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
// imem & mem are all empty
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInFileBlock
(
p
FBlock
,
pBlock
,
p
BlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
doLoadRowsOfIdenticalTsInFileBlock
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
doAppendOneRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
);
}
...
...
@@ -2378,16 +2209,17 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildComposedDataBlock
(
STsdbReader
*
pReader
,
SFileDataBlockInfo
*
pFBlock
,
SBlock
*
pBlock
,
STableBlockScanInfo
*
pBlockScanInfo
)
{
static
int32_t
buildComposedDataBlock
(
STsdbReader
*
pReader
,
STableBlockScanInfo
*
pBlockScanInfo
)
{
SSDataBlock
*
pResBlock
=
pReader
->
pResBlock
;
while
(
1
)
{
buildComposedDataBlockImpl
(
pReader
,
p
FBlock
,
pBlock
,
p
BlockScanInfo
);
buildComposedDataBlockImpl
(
pReader
,
pBlockScanInfo
);
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SFileDataBlockInfo
*
pBlockInfo
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
if
(
pBlockInfo
->
tbBlockIdx
==
pFBlock
->
tbBlockIdx
)
{
// still in the same file block now
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SBlock
*
pBlock
=
taosArrayGet
(
pBlockScanInfo
->
pBlockList
,
pFBlock
->
tbBlockIdx
);
// currently loaded file data block is consumed
if
(
pDumpInfo
->
rowIndex
>=
pBlock
->
nRow
||
pDumpInfo
->
rowIndex
<
0
)
{
setBlockAllDumped
(
pDumpInfo
,
pBlock
,
pReader
->
order
);
break
;
...
...
@@ -2396,16 +2228,16 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo*
if
(
pResBlock
->
info
.
rows
>=
pReader
->
capacity
)
{
break
;
}
}
else
{
// todo traverse to next file due to time window overlap
if
(
pResBlock
->
info
.
rows
>=
pReader
->
capacity
)
{
ASSERT
(
0
);
return
TSDB_CODE_SUCCESS
;
}
}
}
pResBlock
->
info
.
uid
=
pBlockScanInfo
->
uid
;
blockDataUpdateTsWindow
(
pResBlock
,
0
);
setComposedBlockFlag
(
pReader
,
true
);
tsdbDebug
(
"%p uid:%"
PRIu64
", composed data block created, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
pResBlock
->
info
.
window
.
skey
,
pResBlock
->
info
.
window
.
ekey
,
pResBlock
->
info
.
rows
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2549,7 +2381,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
}
// build composed data block
code
=
buildComposedDataBlock
(
pReader
,
p
FBlock
,
pBlock
,
p
ScanInfo
);
code
=
buildComposedDataBlock
(
pReader
,
pScanInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -2653,7 +2485,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
}
else
{
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
STableBlockScanInfo
*
pScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pFBlock
->
uid
,
sizeof
(
pFBlock
->
uid
));
SBlock
*
pBlock
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pFBlock
->
tbBlockIdx
);
// current block are exhausted, try the next file block
if
(
pReader
->
status
.
fBlockDumpInfo
.
allDumped
)
{
...
...
@@ -2673,7 +2504,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return
code
;
}
}
else
{
code
=
buildComposedDataBlock
(
pReader
,
p
FBlock
,
pBlock
,
p
ScanInfo
);
code
=
buildComposedDataBlock
(
pReader
,
pScanInfo
);
return
code
;
}
}
...
...
@@ -2815,10 +2646,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
//1. find the next neighbor block in the scan block list
SFileDataBlockInfo
fb
=
{.
uid
=
pFBlock
->
uid
,
.
tbBlockIdx
=
nextIndex
};
int32_t
neighborIndex
=
findFileBlockInfoIndex
(
&
pStatus
->
b
lockIter
,
&
fb
);
int32_t
neighborIndex
=
findFileBlockInfoIndex
(
pB
lockIter
,
&
fb
);
//2. remove it from the scan block list
setFileBlockActiveInBlockIter
(
&
pStatus
->
blockIter
,
neighborIndex
);
setFileBlockActiveInBlockIter
(
pBlockIter
,
neighborIndex
,
step
);
//3. load the neighbor block, and set it to be the currently accessed file data block
int32_t
code
=
doLoadFileBlockData
(
pReader
,
pBlockIter
,
pScanInfo
,
&
pStatus
->
fileBlockData
);
...
...
@@ -2840,17 +2671,16 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
return
TSDB_CODE_SUCCESS
;
}
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SFileDataBlockInfo
*
pFBlock
,
SBlock
*
pBlock
,
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
,
SRowMerger
*
pMerger
)
{
int32_t
doLoadRowsOfIdenticalTsInFileBlock
(
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
,
SRowMerger
*
pMerger
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
bool
asc
=
ASCENDING_TRAVERSE
(
pReader
->
order
);
int32_t
step
=
asc
?
1
:
-
1
;
int64_t
key
=
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
];
int32_t
step
=
asc
?
1
:
-
1
;
if
(
asc
)
{
pDumpInfo
->
rowIndex
+=
step
;
if
(
pDumpInfo
->
rowIndex
<
pBlockData
->
nRow
-
1
)
{
if
(
pDumpInfo
->
rowIndex
<
=
pBlockData
->
nRow
-
1
)
{
pDumpInfo
->
rowIndex
=
doMergeRowsInFileBlockImpl
(
pBlockData
,
pDumpInfo
->
rowIndex
,
key
,
pMerger
,
&
pReader
->
verRange
,
step
);
}
...
...
@@ -2858,7 +2688,10 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
if
(
pDumpInfo
->
rowIndex
>=
pBlockData
->
nRow
&&
asc
)
{
while
(
1
)
{
CHECK_FILEBLOCK_STATE
st
;
checkForNeighborFileBlock
(
pReader
,
pScanInfo
,
pBlock
,
pFBlock
,
pMerger
,
key
,
&
st
);
SFileDataBlockInfo
*
pFileBlockInfo
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SBlock
*
pCurrentBlock
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pFileBlockInfo
->
tbBlockIdx
);
checkForNeighborFileBlock
(
pReader
,
pScanInfo
,
pCurrentBlock
,
pFileBlockInfo
,
pMerger
,
key
,
&
st
);
if
(
st
==
CHECK_FILEBLOCK_QUIT
)
{
break
;
}
...
...
@@ -2876,7 +2709,7 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
return
TSDB_CODE_SUCCESS
;
}
void
checkU
pdateSchema
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STsdbReader
*
pReader
)
{
void
u
pdateSchema
(
TSDBROW
*
pRow
,
uint64_t
uid
,
STsdbReader
*
pReader
)
{
int32_t
sversion
=
TSDBROW_SVERSION
(
pRow
);
if
(
pReader
->
pSchema
==
NULL
)
{
...
...
@@ -2891,7 +2724,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* has
SRowMerger
merge
=
{
0
};
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
checkU
pdateSchema
(
pRow
,
uid
,
pReader
);
u
pdateSchema
(
pRow
,
uid
,
pReader
);
tRowMergerInit
(
&
merge
,
pRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInBuf
(
dIter
,
hasVal
,
k
.
ts
,
&
merge
,
pReader
);
...
...
@@ -2905,7 +2738,7 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo
TSDBKEY
ik
=
TSDBROW_KEY
(
piRow
);
ASSERT
(
k
.
ts
==
ik
.
ts
);
checkU
pdateSchema
(
piRow
,
pBlockScanInfo
->
uid
,
pReader
);
u
pdateSchema
(
piRow
,
pBlockScanInfo
->
uid
,
pReader
);
tRowMergerInit
(
&
merge
,
piRow
,
pReader
->
pSchema
);
doLoadRowsOfIdenticalTsInBuf
(
pBlockScanInfo
->
iiter
,
&
pBlockScanInfo
->
imemHasVal
,
ik
.
ts
,
&
merge
,
pReader
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录