提交 bc600a49 编写于 作者: H Haojun Liao

refactor: do some internal refactor optimize the building block performance.

上级 64db9afe
...@@ -152,7 +152,7 @@ static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); ...@@ -152,7 +152,7 @@ static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
STsdbReader* pReader); STsdbReader* pReader, bool* freeTSRow);
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow); STSRow** pTSRow);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
...@@ -1235,6 +1235,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1235,6 +1235,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SArray* pDelList = pBlockScanInfo->delSkyline; SArray* pDelList = pBlockScanInfo->delSkyline;
bool freeTSRow = false;
// ascending order traverse // ascending order traverse
if (ASCENDING_TRAVERSE(pReader->order)) { if (ASCENDING_TRAVERSE(pReader->order)) {
...@@ -1248,7 +1249,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1248,7 +1249,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
} }
} else if (k.ts < key) { // k.ts < key } else if (k.ts < key) { // k.ts < key
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader); doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows } else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
tRowMergerInit(&merge, &fRow, pReader->pSchema); tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
...@@ -1260,7 +1261,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1260,7 +1261,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} else { // descending order scan } else { // descending order scan
if (key < k.ts) { if (key < k.ts) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader); doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
} else if (k.ts < key) { } else if (k.ts < key) {
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1302,12 +1303,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1302,12 +1303,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
ASSERT(pRow != NULL && piRow != NULL); ASSERT(pRow != NULL && piRow != NULL);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
bool freeTSRow = false;
uint64_t uid = pBlockScanInfo->uid; uint64_t uid = pBlockScanInfo->uid;
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
if (ASCENDING_TRAVERSE(pReader->order)) { if (ASCENDING_TRAVERSE(pReader->order)) {
// [1&2] key <= [k.ts && ik.ts] // [1&2] key <= [k.ts && ik.ts]
if (key <= k.ts && key <= ik.ts) { if (key <= k.ts && key <= ik.ts) {
...@@ -1335,16 +1336,22 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1335,16 +1336,22 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [3] ik.ts < key <= k.ts // [3] ik.ts < key <= k.ts
// [4] ik.ts < k.ts <= key // [4] ik.ts < k.ts <= key
if (ik.ts < k.ts) { if (ik.ts < k.ts) {
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// [5] k.ts < key <= ik.ts // [5] k.ts < key <= ik.ts
// [6] k.ts < ik.ts <= key // [6] k.ts < ik.ts <= key
if (k.ts < ik.ts) { if (k.ts < ik.ts) {
doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader); doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1354,6 +1361,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1354,6 +1361,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow); doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -1385,8 +1393,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1385,8 +1393,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [3] ik.ts > k.ts >= Key // [3] ik.ts > k.ts >= Key
// [4] ik.ts > key >= k.ts // [4] ik.ts > key >= k.ts
if (ik.ts > key) { if (ik.ts > key) {
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1399,18 +1410,21 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1399,18 +1410,21 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//[7] key = ik.ts > k.ts //[7] key = ik.ts > k.ts
if (key == ik.ts) { if (key == ik.ts) {
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMerge(&merge, &fRow); tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -2280,11 +2294,29 @@ void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { ...@@ -2280,11 +2294,29 @@ void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
} }
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
STsdbReader* pReader) { STsdbReader* pReader, bool* freeTSRow) {
{ // if the timestamp of the next valid row has a different ts, return current row directly
TSDBROW current = *pRow;
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) {
*pTSRow = current.pTSRow;
*freeTSRow = false;
return;
} else { // has next point in mem/imem
TSDBROW* pNextRow = getValidRow(pIter, pDelList, pReader);
if (TSDBROW_KEY(&current).ts != TSDBROW_KEY(pNextRow).ts) {
*pTSRow = current.pTSRow;
*freeTSRow = false;
return;
}
}
}
SRowMerger merge = {0}; SRowMerger merge = {0};
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY k = TSDBROW_KEY(pRow); // get the correct schema for data in memory
// updateSchema(pRow, uid, pReader);
int32_t sversion = TSDBROW_SVERSION(pRow); int32_t sversion = TSDBROW_SVERSION(pRow);
STSchema* pTSchema = NULL; STSchema* pTSchema = NULL;
if (sversion != pReader->pSchema->version) { if (sversion != pReader->pSchema->version) {
...@@ -2298,6 +2330,8 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe ...@@ -2298,6 +2330,8 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
tRowMergerGetRow(&merge, pTSRow); tRowMergerGetRow(&merge, pTSRow);
tRowMergerClear(&merge); tRowMergerClear(&merge);
*freeTSRow = true;
if (sversion != pReader->pSchema->version) { if (sversion != pReader->pSchema->version) {
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
} }
...@@ -2332,7 +2366,7 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo ...@@ -2332,7 +2366,7 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
} }
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow,
int64_t endKey) { int64_t endKey, bool* freeTSRow) {
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
SArray* pDelList = pBlockScanInfo->delSkyline; SArray* pDelList = pBlockScanInfo->delSkyline;
...@@ -2358,23 +2392,24 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ...@@ -2358,23 +2392,24 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
if (ik.ts < k.ts) { // ik.ts < k.ts if (ik.ts < k.ts) { // ik.ts < k.ts
doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader); doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
} else if (k.ts < ik.ts) { } else if (k.ts < ik.ts) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader); doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
} else { // ik.ts == k.ts } else { // ik.ts == k.ts
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
*freeTSRow = true;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pBlockScanInfo->iter.hasVal && pRow != NULL) { if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader); doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader); doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2472,13 +2507,16 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -2472,13 +2507,16 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
do { do {
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey); bool freeTSRow = false;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
if (pTSRow == NULL) { if (pTSRow == NULL) {
break; break;
} }
doAppendRowFromTSRow(pBlock, pReader, pTSRow); doAppendRowFromTSRow(pBlock, pReader, pTSRow);
taosMemoryFree(pTSRow); if (freeTSRow) {
taosMemoryFree(pTSRow);
}
// no data in buffer, return immediately // no data in buffer, return immediately
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册