未验证 提交 b2e3c780 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17813 from taosdata/feature/3_liaohj

fix(query): optimize the performance of tsdbread.
...@@ -138,7 +138,7 @@ typedef struct SReaderStatus { ...@@ -138,7 +138,7 @@ typedef struct SReaderStatus {
bool loadFromFile; // check file stage bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not bool composedDataBlock; // the returned data block is a composed block or not
SHashObj* pTableMap; // SHash<STableBlockScanInfo> SHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
SUidOrderCheckInfo uidCheckInfo; // check all table in uid order SUidOrderCheckInfo uidCheckInfo; // check all table in uid order
SFileBlockDumpInfo fBlockDumpInfo; SFileBlockDumpInfo fBlockDumpInfo;
SDFileSet* pCurrentFileset; // current opened file set SDFileSet* pCurrentFileset; // current opened file set
...@@ -147,6 +147,12 @@ typedef struct SReaderStatus { ...@@ -147,6 +147,12 @@ typedef struct SReaderStatus {
SDataBlockIter blockIter; SDataBlockIter blockIter;
} SReaderStatus; } SReaderStatus;
typedef struct SBlockInfoBuf {
int32_t currentIndex;
SArray* pData;
int32_t numPerBucket;
} SBlockInfoBuf;
struct STsdbReader { struct STsdbReader {
STsdb* pTsdb; STsdb* pTsdb;
uint64_t suid; uint64_t suid;
...@@ -164,9 +170,9 @@ struct STsdbReader { ...@@ -164,9 +170,9 @@ struct STsdbReader {
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SDataFReader* pFileReader; SDataFReader* pFileReader;
SVersionRange verRange; SVersionRange verRange;
SBlockInfoBuf blockInfoBuf;
int32_t step; int32_t step;
STsdbReader* innerReader[2]; STsdbReader* innerReader[2];
}; };
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
...@@ -232,6 +238,50 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { ...@@ -232,6 +238,50 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
int32_t num = numOfTables / pBuf->numPerBucket;
int32_t remainder = numOfTables % pBuf->numPerBucket;
if (pBuf->pData == NULL) {
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
}
for(int32_t i = 0; i < num; ++i) {
char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
}
if (remainder > 0) {
char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
}
return TSDB_CODE_SUCCESS;
}
static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
size_t num = taosArrayGetSize(pBuf->pData);
for(int32_t i = 0; i < num; ++i) {
char** p = taosArrayGet(pBuf->pData, i);
taosMemoryFree(*p);
}
taosArrayDestroy(pBuf->pData);
}
static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
int32_t bucketIndex = index / pBuf->numPerBucket;
char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) { static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption // todo use simple hash instead, optimize the memory consumption
...@@ -242,9 +292,23 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -242,9 +292,23 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
} }
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables);
for (int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j);
pScanInfo->uid = idList[j].uid;
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
int64_t skey = pTsdbReader->window.skey;
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
} else {
int64_t ekey = pTsdbReader->window.ekey;
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
}
taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
#if 0
// STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
int64_t skey = pTsdbReader->window.skey; int64_t skey = pTsdbReader->window.skey;
info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
...@@ -254,7 +318,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -254,7 +318,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
} }
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey, #endif
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pScanInfo->lastKey,
pTsdbReader->idStr); pTsdbReader->idStr);
} }
...@@ -266,18 +332,19 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -266,18 +332,19 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
return pTableMap; return pTableMap;
} }
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) { static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
STableBlockScanInfo* p = NULL; STableBlockScanInfo** p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) { while ((p = taosHashIterate(pTableMap, p)) != NULL) {
p->iterInit = false; STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p;
p->iiter.hasVal = false;
if (p->iter.iter != NULL) { pInfo->iterInit = false;
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); pInfo->iiter.hasVal = false;
if (pInfo->iter.iter != NULL) {
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
} }
p->delSkyline = taosArrayDestroy(p->delSkyline); pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
p->lastKey = ts; pInfo->lastKey = ts;
} }
} }
...@@ -298,10 +365,10 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) { ...@@ -298,10 +365,10 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) {
tMapDataClear(&p->mapData); tMapDataClear(&p->mapData);
} }
static void destroyBlockScanInfo(SHashObj* pTableMap) { static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
STableBlockScanInfo* p = NULL; void* p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) { while ((p = taosHashIterate(pTableMap, p)) != NULL) {
clearBlockScanInfo(p); clearBlockScanInfo(*(STableBlockScanInfo**)p);
} }
taosHashCleanup(pTableMap); taosHashCleanup(pTableMap);
...@@ -506,7 +573,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd ...@@ -506,7 +573,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->verRange = getQueryVerRange(pVnode, pCond, level); pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type; pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
ASSERT(pCond->numOfCols > 0); ASSERT(pCond->numOfCols > 0);
limitOutputBufferSize(pCond, &pReader->capacity); limitOutputBufferSize(pCond, &pReader->capacity);
...@@ -577,7 +644,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, ...@@ -577,7 +644,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
continue; continue;
} }
STableBlockScanInfo* pScanInfo = p; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
if (pScanInfo->pBlockList == NULL) { if (pScanInfo->pBlockList == NULL) {
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex)); pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
} }
...@@ -597,7 +664,7 @@ _end: ...@@ -597,7 +664,7 @@ _end:
} }
static void cleanupTableScanInfo(SHashObj* pTableMap) { static void cleanupTableScanInfo(SHashObj* pTableMap) {
STableBlockScanInfo* px = NULL; STableBlockScanInfo** px = NULL;
while (1) { while (1) {
px = taosHashIterate(pTableMap, px); px = taosHashIterate(pTableMap, px);
if (px == NULL) { if (px == NULL) {
...@@ -605,8 +672,8 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) { ...@@ -605,8 +672,8 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
} }
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
tMapDataClear(&px->mapData); tMapDataClear(&(*px)->mapData);
taosArrayClear(px->pBlockList); taosArrayClear((*px)->pBlockList);
} }
} }
...@@ -621,7 +688,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN ...@@ -621,7 +688,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i); SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); STableBlockScanInfo* pScanInfo =
*(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
tMapDataReset(&pScanInfo->mapData); tMapDataReset(&pScanInfo->mapData);
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
...@@ -1069,14 +1137,14 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v ...@@ -1069,14 +1137,14 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) { static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo == NULL) { if (pScanInfo == NULL) {
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr); tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk); tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
} }
#if 0 #if 0
...@@ -1111,7 +1179,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte ...@@ -1111,7 +1179,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
break; break;
} }
STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr; STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) { if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
continue; continue;
} }
...@@ -2235,7 +2303,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2235,7 +2303,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = NULL; STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) { if (pBlockScanInfo == NULL) {
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
...@@ -2255,7 +2323,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2255,7 +2323,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
} }
} else { // file blocks not exist } else { // file blocks not exist
pBlockScanInfo = pReader->status.pTableIter; pBlockScanInfo = *pReader->status.pTableIter;
} }
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
...@@ -2480,7 +2548,7 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea ...@@ -2480,7 +2548,7 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea
void* p = taosHashIterate(pStatus->pTableMap, NULL); void* p = taosHashIterate(pStatus->pTableMap, NULL);
while (p != NULL) { while (p != NULL) {
STableBlockScanInfo* pScanInfo = p; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) p;
pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid; pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
p = taosHashIterate(pStatus->pTableMap, p); p = taosHashIterate(pStatus->pTableMap, p);
} }
...@@ -2554,7 +2622,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2554,7 +2622,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while (1) { while (1) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = pStatus->pTableIter; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter;
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasVal) { if (!hasVal) {
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
...@@ -2592,9 +2660,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2592,9 +2660,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
} else { } else {
pScanInfo = pReader->status.pTableIter; pScanInfo = *pReader->status.pTableIter;
} }
if (pScanInfo == NULL) { if (pScanInfo == NULL) {
...@@ -2659,11 +2727,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { ...@@ -2659,11 +2727,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
} }
} }
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
initMemDataIterator(pBlockScanInfo, pReader); initMemDataIterator(*pBlockScanInfo, pReader);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN; int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey); int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3465,18 +3533,23 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -3465,18 +3533,23 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
// TODO refactor: with createDataBlockScanInfo // TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) { int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
int32_t size = taosHashGetSize(pReader->status.pTableMap);
STableBlockScanInfo* p = NULL; STableBlockScanInfo** p = NULL;
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) { while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
clearBlockScanInfo(p); clearBlockScanInfo(*p);
} }
// todo handle the case where size is less than the value of num
ASSERT(size >= num);
taosHashClear(pReader->status.pTableMap); taosHashClear(pReader->status.pTableMap);
STableKeyInfo* pList = (STableKeyInfo*) pTableList; STableKeyInfo* pList = (STableKeyInfo*) pTableList;
for(int32_t i = 0; i < num; ++i) { for(int32_t i = 0; i < num; ++i) {
STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid}; STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); pInfo->uid = pList[i].uid;
taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
} }
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
...@@ -3680,8 +3753,9 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -3680,8 +3753,9 @@ void tsdbReaderClose(STsdbReader* pReader) {
cleanupDataBlockIterator(&pReader->status.blockIter); cleanupDataBlockIterator(&pReader->status.blockIter);
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
destroyBlockScanInfo(pReader->status.pTableMap); destroyAllBlockScanInfo(pReader->status.pTableMap);
blockDataDestroy(pReader->pResBlock); blockDataDestroy(pReader->pResBlock);
clearBlockScanInfoBuf(&pReader->blockInfoBuf);
if (pReader->pFileReader != NULL) { if (pReader->pFileReader != NULL) {
tsdbDataFReaderClose(&pReader->pFileReader); tsdbDataFReaderClose(&pReader->pFileReader);
...@@ -3765,7 +3839,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3765,7 +3839,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pReader->step == EXTERNAL_ROWS_PREV) { if (pReader->step == EXTERNAL_ROWS_PREV) {
// prepare for the main scan // prepare for the main scan
int32_t code = doOpenReaderImpl(pReader); int32_t code = doOpenReaderImpl(pReader);
resetDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey); resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -3782,7 +3856,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3782,7 +3856,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) { if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
// prepare for the next row scan // prepare for the next row scan
int32_t code = doOpenReaderImpl(pReader->innerReader[1]); int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey); resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3798,7 +3872,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3798,7 +3872,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
} }
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
if (pBlockScanInfo == NULL) { // no data block for the table of given uid if (pBlockScanInfo == NULL) { // no data block for the table of given uid
return false; return false;
} }
...@@ -3911,7 +3985,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { ...@@ -3911,7 +3985,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
} }
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) { if (pBlockScanInfo == NULL) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
...@@ -3947,6 +4021,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { ...@@ -3947,6 +4021,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
pReader->order = pCond->order; pReader->order = pCond->order;
pReader->type = TIMEWINDOW_RANGE_CONTAINED; pReader->type = TIMEWINDOW_RANGE_CONTAINED;
pReader->status.loadFromFile = true; pReader->status.loadFromFile = true;
...@@ -3963,13 +4039,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { ...@@ -3963,13 +4039,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockIterator(pBlockIter, pReader->order);
int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1; int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
resetDataBlockScanInfo(pReader->status.pTableMap, ts); resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
int32_t code = 0; int32_t code = 0;
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
// no data in files, let's try buffer in memory // no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) { if (pReader->status.fileIter.numOfFiles == 0) {
...@@ -4071,7 +4146,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { ...@@ -4071,7 +4146,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
while (pStatus->pTableIter != NULL) { while (pStatus->pTableIter != NULL) {
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
STbData* d = NULL; STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) { if (pReader->pTsdb->mem != NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册