提交 1073b5e4 编写于 作者: H Haojun Liao

fix(tsdb): support to read multi-level of stt files.

上级 3edee3fe
......@@ -704,20 +704,19 @@ typedef struct {
typedef struct SSttBlockLoadInfo {
SBlockData blockData[2];
void *pBlockArray;
SArray *aSttBlk;
SArray *pTombBlockArray; // tomb block array list
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex;
int32_t loadBlocks;
double elapsedTime;
STSchema *pSchema;
int16_t *colIds;
int32_t numOfCols;
bool checkRemainingRow;
bool isLast;
bool sttBlockLoaded;
int32_t numOfStt;
SArray *aSttBlk;
SArray *pTombBlockArray; // tomb block array list
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex;
int32_t loadBlocks;
double elapsedTime;
STSchema *pSchema;
int16_t *colIds;
int32_t numOfCols;
bool checkRemainingRow;
bool isLast;
bool sttBlockLoaded;
int32_t numOfStt;
// keep the last access position, this position may be used to reduce the binary times for
// starting last block data for a new table
......@@ -778,7 +777,7 @@ struct SDiskDataBuilder {
typedef struct SLDataIter {
SRBTreeNode node;
SSttBlk *pSttBlk;
int32_t iStt;
int32_t iStt; // for debug purpose
int8_t backward;
int32_t iSttBlk;
int32_t iRow;
......@@ -797,9 +796,9 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter);
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter,
void *pCurrentFileSet);
STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr,
bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema,
int16_t* pCols, int32_t numOfCols);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
......@@ -807,10 +806,11 @@ bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void destroySttBlockReader(SLDataIter *pLDataIter, int32_t numOfIter);
void* destroySttBlockReader(SArray* pLDataIterArray);
// tsdbCache ==============================================================================================
typedef struct SCacheRowsReader {
......
......@@ -54,6 +54,36 @@ SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList,
return pLoadInfo;
}
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pLoadInfo->numOfStt = 1;
pLoadInfo->blockIndex[0] = -1;
pLoadInfo->blockIndex[1] = -1;
pLoadInfo->currentLoadBlockIndex = 1;
int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0]);
if (code) {
terrno = code;
}
code = tBlockDataCreate(&pLoadInfo->blockData[1]);
if (code) {
terrno = code;
}
pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
pLoadInfo->pSchema = pSchema;
pLoadInfo->colIds = colList;
pLoadInfo->numOfCols = numOfCols;
return pLoadInfo;
}
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1;
......@@ -103,14 +133,26 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
return NULL;
}
void destroySttBlockReader(SLDataIter* pLDataIter, int32_t numOfIter) {
if (pLDataIter == NULL) {
return;
static void destroyLDataIterFn(void* param) {
SLDataIter** pIter = (SLDataIter**) param;
tLDataIterClose2(*pIter);
destroyLastBlockLoadInfo((*pIter)->pBlockLoadInfo);
taosMemoryFree(*pIter);
}
void* destroySttBlockReader(SArray* pLDataIterArray) {
if (pLDataIterArray == NULL) {
return NULL;
}
for(int32_t i = 0; i < numOfIter; ++i) {
tLDataIterClose2(&pLDataIter[i]);
int32_t numOfLevel = taosArrayGetSize(pLDataIterArray);
for(int32_t i = 0; i < numOfLevel; ++i) {
SArray* pList = taosArrayGetP(pLDataIterArray, i);
taosArrayDestroyEx(pList, destroyLDataIterFn);
}
taosArrayDestroy(pLDataIterArray);
return NULL;
}
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
......@@ -371,7 +413,6 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32
pIter->timeWindow.skey = pTimeWindow->skey;
pIter->timeWindow.ekey = pTimeWindow->ekey;
pIter->pReader = pReader;
pIter->pBlockLoadInfo = pBlockLoadInfo;
if (!pBlockLoadInfo->sttBlockLoaded) {
......@@ -686,9 +727,9 @@ _end:
}
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter,
void *pCurrentFileSet) {
STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr,
bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema,
int16_t* pCols, int32_t numOfCols) {
int32_t code = TSDB_CODE_SUCCESS;
pMTree->backward = backward;
......@@ -701,45 +742,71 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
}
pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo;
// pMTree->pLoadInfo = pBlockLoadInfo;
// pMTree->destroyLoadInfo = true;
pMTree->ignoreEarlierTs = false;
// todo handle other level of stt files, here only deal with the first level stt
int32_t size = ((STFileSet *)pCurrentFileSet)->lvlArr[0].size;
int32_t size = ((STFileSet *)pCurrentFileSet)->lvlArr->size;
if (size == 0) {
goto _end;
}
SSttLvl *pSttLevel = ((STFileSet *)pCurrentFileSet)->lvlArr[0].data[0];
ASSERT(pSttLevel->level == 0);
while (taosArrayGetSize(pSttFileBlockIterArray) < size) {
SArray* pList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pSttFileBlockIterArray, &pList);
}
for (int32_t i = 0; i < pSttLevel->fobjArr[0].size; ++i) { // open all last file
SSttFileReader* pSttFileReader = pLDataIter[i].pReader;
memset(&pLDataIter[i], 0, sizeof(SLDataIter));
for(int32_t j = 0; j < size; ++j) {
SSttLvl *pSttLevel = ((STFileSet *)pCurrentFileSet)->lvlArr->data[j];
ASSERT(pSttLevel->level == j);
if (pSttFileReader == NULL) {
SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.szPage};
conf.file[0] = *pSttLevel->fobjArr[0].data[i]->f;
SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j);
int32_t numOfIter = taosArrayGetSize(pList);
code = tsdbSttFileReaderOpen(pSttLevel->fobjArr[0].data[i]->fname, &conf, &pSttFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) {
int32_t inc = TARRAY2_SIZE(pSttLevel->fobjArr) - numOfIter;
for(int32_t k = 0; k < inc; ++k) {
SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
taosArrayPush(pList, &pIter);
}
}
code = tLDataIterOpen2(&pLDataIter[i], pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file
SLDataIter* pIter = taosArrayGetP(pList, i);
bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
if (hasVal) {
tMergeTreeAddIter(pMTree, &pLDataIter[i]);
} else {
if (!pMTree->ignoreEarlierTs) {
pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
SSttFileReader *pSttFileReader = pIter->pReader;
SSttBlockLoadInfo* pLoadInfo = pIter->pBlockLoadInfo;
// open stt file reader if not
if (pSttFileReader == NULL) {
SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.szPage};
conf.file[0] = *pSttLevel->fobjArr->data[i]->f;
code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (pLoadInfo == NULL) {
pLoadInfo = tCreateOneLastBlockLoadInfo(pSchema, pCols, numOfCols);
}
memset(pIter, 0, sizeof(SLDataIter));
code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
pLoadInfo, pMTree->idStr, strictTimeRange);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
if (hasVal) {
tMergeTreeAddIter(pMTree, pIter);
} else {
if (!pMTree->ignoreEarlierTs) {
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
}
}
}
}
......
......@@ -172,7 +172,8 @@ typedef struct SReaderStatus {
SBlockData fileBlockData;
SFilesetIter fileIter;
SDataBlockIter blockIter;
SLDataIter* pLDataIter;
SArray* pLDataIterArray;
// SLDataIter* pLDataIter;
SRowMerger merger;
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
} SReaderStatus;
......@@ -596,7 +597,9 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
destroySttBlockReader(pReader->status.pLDataIter, pReader->pTsdb->pVnode->config.sttTrigger);
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
......@@ -959,15 +962,10 @@ _end:
static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) {
// reset the index in last block when handing a new file
tMapDataClear(&pScanInfo->mapData);
taosArrayClear(pScanInfo->pBlockList);
}
static void cleanupTableScanInfo(SReaderStatus* pStatus) {
// if (pStatus->mapDataCleaned) {
// return;
// }
SSHashObj* pTableMap = pStatus->pTableMap;
STableBlockScanInfo** px = NULL;
int32_t iter = 0;
......@@ -2856,10 +2854,10 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
pScanInfo->uid, pReader->idStr);
int32_t code =
tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb,
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, false,
pReader->idStr, false, pReader->status.pLDataIter, pReader->status.pCurrentFileset);
int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb,
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, false,
pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, pReader->pSchema,
pReader->suppInfo.colId, pReader->suppInfo.numOfCols);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
......@@ -3381,8 +3379,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
// reset the index in last block when handing a new file
doCleanupTableScanInfo(pScanInfo);
// pStatus->mapDataCleaned = true;
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
......@@ -4760,8 +4756,8 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
goto _err;
}
pReader->status.pLDataIter = taosMemoryCalloc(pConf->sttTrigger, sizeof(SLDataIter));
if (pReader->status.pLDataIter == NULL) {
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
if (pReader->status.pLDataIterArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -4789,7 +4785,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
}
static void clearSharedPtr(STsdbReader* p) {
p->status.pLDataIter = NULL;
p->status.pLDataIterArray = NULL;
p->status.pTableMap = NULL;
p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL;
......@@ -4800,7 +4796,7 @@ static void clearSharedPtr(STsdbReader* p) {
static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
pDst->status.pTableMap = pSrc->status.pTableMap;
pDst->status.pLDataIter = pSrc->status.pLDataIter;
pDst->status.pLDataIterArray = pSrc->status.pLDataIterArray;
pDst->status.uidList = pSrc->status.uidList;
pDst->pSchema = pSrc->pSchema;
pDst->pSchemaMap = pSrc->pSchemaMap;
......@@ -4884,15 +4880,11 @@ void tsdbReaderClose2(STsdbReader* pReader) {
tMergeTreeClose(&pLReader->mergeTree);
getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
// todo dynamic allocate the number of stt data iter
destroySttBlockReader(pReader->status.pLDataIter, pReader->pTsdb->pVnode->config.sttTrigger);
taosMemoryFree(pLReader);
}
taosMemoryFreeClear(pReader->status.pLDataIter);
destroySttBlockReader(pReader->status.pLDataIterArray);
taosMemoryFreeClear(pReader->status.uidList.tableUidList);
tsdbDebug(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册