提交 42d635a7 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 529394a0
......@@ -147,7 +147,7 @@ struct STsdbReader {
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader);
static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader);
static int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger);
static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader);
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
......@@ -829,7 +829,7 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, SBlockLoadSuppInfo* pSup) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (pColVal->isNull) {
if (pColVal->isNull || pColVal->isNone) {
colDataAppendNULL(pColInfoData, rowIndex);
} else {
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
......@@ -2096,58 +2096,83 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
STSRow* pTSRow, STbDataIter* pIter, bool* hasVal, int64_t key,
SFileDataBlockInfo* pFBlock, SBlock* pBlock) {
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
STSRow* pTSRow, STbDataIter* pIter, bool* hasVal, int64_t key) {
SRowMerger merge = {0};
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBKEY k = TSDBROW_KEY(pRow);
if (key <= k.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
doLoadRowsOfIdenticalTsInFileBlock(pBlockData, pBlockScanInfo, pReader, &merge);
// ascending order traverse
if (ASCENDING_TRAVERSE(pReader->order)) {
if (key < k.ts) {
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
} else if (k.ts < key) { // k.ts < key
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader);
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
if (k.ts == key) {
tRowMerge(&merge, pRow);
doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
}
} else { // descending order scan
if (key < k.ts) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader);
} else if (k.ts < key) {
tRowMergerInit(&merge, &fRow, pReader->pSchema);
tRowMergerGetRow(&merge, &pTSRow);
} else { // k.ts < key
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
} else { // descending order: mem rows -----> imem rows ------> file block
updateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader);
tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
}
}
tRowMergerClear(&merge);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) {
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
uint64_t uid = pBlockScanInfo->uid;
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
if (ASCENDING_TRAVERSE(pReader->order)) {
// [1&2] key <= [k.ts && ik.ts]
if (key <= k.ts && key <= ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
if (ik.ts == key) {
tRowMerge(&merge, piRow);
......@@ -2161,12 +2186,14 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
} else { // key > ik.ts || key > k.ts
ASSERT(key != ik.ts);
// [3] ik.ts < key <= k.ts
// [4] ik.ts < k.ts <= key
if (ik.ts < k.ts) {
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow,
pReader);
doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
......@@ -2174,39 +2201,116 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
// [5] k.ts < key <= ik.ts
// [6] k.ts < ik.ts <= key
if (k.ts < ik.ts) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, &pTSRow,
pReader);
doMergeMultiRows(pRow, uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, &pTSRow, pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [7] k.ts == ik.ts < key
if (k.ts == ik.ts) {
ASSERT(key > ik.ts && key > k.ts);
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
}
} else { // descending order scan
// [1/2] k.ts >= ik.ts && k.ts >= key
if (k.ts >= ik.ts && k.ts >= key) {
updateSchema(pRow, uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader);
if (ik.ts == k.ts) {
tRowMerge(&merge, piRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader);
}
if (k.ts == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
} else {
ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch
// [3] ik.ts > k.ts >= Key
// [4] ik.ts > key >= k.ts
if (ik.ts > key) {
doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [5] key > ik.ts > k.ts
// [6] key > k.ts > ik.ts
if (key > ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
//[7] key = ik.ts > k.ts
if (key == ik.ts) {
doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
}
}
ASSERT(0);
}
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) {
return doMergeThreeLevelRows(pReader, pBlockScanInfo);
} else {
// imem + file
if (pBlockScanInfo->imemHasVal) {
return doMergeBufFileRows(pReader, pBlockScanInfo, piRow, pTSRow, pBlockScanInfo->iiter,
&pBlockScanInfo->imemHasVal, key, pFBlock, pBlock);
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, pBlockScanInfo->iiter,
&pBlockScanInfo->imemHasVal, key);
}
if (pBlockScanInfo->memHasVal) { // pBlockScanInfo->memHasVal != NULL
return doMergeBufFileRows(pReader, pBlockScanInfo, pRow, pTSRow, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal,
key, pFBlock, pBlock);
// mem + file
if (pBlockScanInfo->memHasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal,
key);
}
// imem & mem are all empty
// imem & mem are all empty, only file exist
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
......@@ -2671,39 +2775,31 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
return TSDB_CODE_SUCCESS;
}
int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) {
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
int32_t step = asc ? 1 : -1;
if (asc) {
pDumpInfo->rowIndex += step;
if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) {
pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
}
pDumpInfo->rowIndex += step;
if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) {
pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
}
// all rows are consumed, let's try next file block
if (pDumpInfo->rowIndex >= pBlockData->nRow && asc) {
while (1) {
CHECK_FILEBLOCK_STATE st;
// all rows are consumed, let's try next file block
if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
while (1) {
CHECK_FILEBLOCK_STATE st;
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx);
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
if (st == CHECK_FILEBLOCK_QUIT) {
break;
}
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx);
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
if (st == CHECK_FILEBLOCK_QUIT) {
break;
}
}
} else { // last row of current block, check if current block is overlapped with previous neighbor block
pDumpInfo->rowIndex += step;
// bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
// if (overlap) { // load next block
// ASSERT(0);
// }
// }
}
return TSDB_CODE_SUCCESS;
......@@ -2736,15 +2832,24 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
ASSERT(k.ts == ik.ts);
updateSchema(piRow, pBlockScanInfo->uid, pReader);
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
updateSchema(piRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMerge(&merge, pRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
} else {
updateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMerge(&merge, pRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->memHasVal, ik.ts, &merge, pReader);
tRowMerge(&merge, piRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->imemHasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, pTSRow);
}
......@@ -2753,20 +2858,19 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
TSDBKEY k = {.ts = TSKEY_INITIAL_VAL};
TSDBKEY ik = {.ts = TSKEY_INITIAL_VAL};
if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) {
k = TSDBROW_KEY(pRow);
ik = TSDBROW_KEY(piRow);
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
if (ik.ts <= k.ts) {
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
} else { // k.ts < ik.ts
if (ik.ts < k.ts) { // ik.ts < k.ts
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader);
} else if (k.ts < ik.ts) {
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader);
return TSDB_CODE_SUCCESS;
} else { // ik.ts == k.ts
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
}
return TSDB_CODE_SUCCESS;
}
if (pBlockScanInfo->memHasVal) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册