提交 ba4d726a 编写于 作者: H Haojun Liao

fix(tsdb): fix error in tsdbread.

上级 b39198a5
......@@ -545,7 +545,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
}
if (uid < pRecord->uid) { // forward the table uid index
while (pReader->status.uidList.tableUidList[k] < pRecord->uid && k < numOfTables) {
while (k < numOfTables && pReader->status.uidList.tableUidList[k] < pRecord->uid) {
k += 1;
}
......
......@@ -453,48 +453,8 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
}
// load tomb data API
//static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t suid, uint64_t uid, int64_t maxVer);
//int32_t loadSttTombData(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pScanInfo, uint64_t maxVer) {
// int32_t size = taosArrayGetSize(pLDataIterList);
// if (size <= 0) {
// return TSDB_CODE_SUCCESS;
// }
//
// uint64_t uid = pScanInfo->uid;
// if (pScanInfo->pfileDelData == NULL) {
// pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
// }
//
// for (int32_t i = 0; i < size; ++i) {
// SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i);
//
// int32_t numOfIter = taosArrayGetSize(pLeveledLDataIter);
// if (numOfIter == 0) {
// continue;
// }
//
// for (int32_t f = 0; f < numOfIter; ++f) {
// SLDataIter* pIter = taosArrayGetP(pLeveledLDataIter, f);
//
// SArray* pTombBlockArray = pIter->pBlockLoadInfo->pTombBlockArray;
// int32_t numOfBlocks = taosArrayGetSize(pTombBlockArray);
// for (int32_t k = 0; k < numOfBlocks; ++k) {
// STombBlock* pBlock = taosArrayGetP(pTombBlockArray, k);
//
// int32_t code = checkTombBlockRecords(pScanInfo->pfileDelData, pBlock, suid, uid, maxVer);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
// }
// }
// }
//
// return TSDB_CODE_SUCCESS;
//}
int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReader* pReader,
SDataFileReader* pFileReader, SSttFileReader* pSttReader, bool isFile) {
static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReader* pReader,
void* pFileReader, bool isFile) {
int32_t code = 0;
STableUidList* pList = &pReader->status.uidList;
......@@ -524,7 +484,7 @@ int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReade
STombBlock block = {0};
code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
: tsdbSttFileReadTombBlock(pSttReader, &pTombBlkArray->data[i], &block);
: tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -540,6 +500,7 @@ int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReade
for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) {
code = tTombBlockGet(&block, k, &record);
if (code != TSDB_CODE_SUCCESS) {
tTombBlockDestroy(&block);
return code;
}
......@@ -554,7 +515,7 @@ int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReade
bool newTable = false;
if (uid < record.uid) {
while (pReader->status.uidList.tableUidList[j] < record.uid && j < numOfTables) {
while (j < numOfTables && pReader->status.uidList.tableUidList[j] < record.uid) {
j += 1;
newTable = true;
}
......@@ -605,7 +566,7 @@ int32_t loadDataFileTombDataForAll(STsdbReader* pReader) {
return code;
}
return doLoadTombDataFromTombBlk(pBlkArray, pReader, pReader->pFileReader, NULL, true);
return doLoadTombDataFromTombBlk(pBlkArray, pReader, pReader->pFileReader, true);
}
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo) {
......@@ -619,7 +580,7 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead
return code;
}
return doLoadTombDataFromTombBlk(pBlkArray, pReader, NULL, pSttFileReader, false);
return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false);
}
void loadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) {
......@@ -649,32 +610,3 @@ void loadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbDat
}
}
}
int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t suid, uint64_t uid, int64_t maxVer) {
STombRecord record = {0};
for (int32_t j = 0; j < pBlock->suid->size; ++j) {
int32_t code = tTombBlockGet(pBlock, j, &record);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (record.suid < suid) {
continue;
}
if (record.suid > suid || (record.suid == suid && record.uid > uid)) {
break;
}
if (record.uid < uid) {
continue;
}
if (record.version <= maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pData, &delData);
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -253,7 +253,6 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr);
// load tomb data API (stt/mem only for one table each, tomb data from data files are load for all tables at one time)
//int32_t loadSttTombData(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pScanInfo, uint64_t maxVer);
void loadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册