提交 98349a42 编写于 作者: H Haojun Liao

refactor(tsdb): do some internal refactor.

上级 ba4d726a
...@@ -795,10 +795,23 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead ...@@ -795,10 +795,23 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter);
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid, typedef struct {
STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr, int8_t backward;
bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema, STsdb *pTsdb;
int16_t* pCols, int32_t numOfCols, void* pReader); uint64_t suid;
uint64_t uid;
STimeWindow timewindow;
SVersionRange verRange;
bool strictTimeRange;
SArray *pSttFileBlockIterArray;
void *pCurrentFileset;
STSchema *pSchema;
int16_t *pCols;
int32_t numOfCols;
void *pReader;
void *idstr;
} SMergeTreeConf;
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf* pConf);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
......
...@@ -721,15 +721,12 @@ _end: ...@@ -721,15 +721,12 @@ _end:
return code; return code;
} }
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf* pConf) {
STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr,
bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema,
int16_t* pCols, int32_t numOfCols, void* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pMTree->backward = backward;
pMTree->pIter = NULL; pMTree->pIter = NULL;
pMTree->idStr = idStr; pMTree->backward = pConf->backward;
pMTree->idStr = pConf->idstr;
if (!pMTree->backward) { // asc if (!pMTree->backward) { // asc
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
...@@ -740,21 +737,21 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6 ...@@ -740,21 +737,21 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6
pMTree->ignoreEarlierTs = false; pMTree->ignoreEarlierTs = false;
// todo handle other level of stt files, here only deal with the first level stt // todo handle other level of stt files, here only deal with the first level stt
int32_t size = ((STFileSet *)pCurrentFileSet)->lvlArr->size; int32_t size = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->size;
if (size == 0) { if (size == 0) {
goto _end; goto _end;
} }
while (taosArrayGetSize(pSttFileBlockIterArray) < size) { while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) {
SArray* pList = taosArrayInit(4, POINTER_BYTES); SArray* pList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pSttFileBlockIterArray, &pList); taosArrayPush(pConf->pSttFileBlockIterArray, &pList);
} }
for(int32_t j = 0; j < size; ++j) { for(int32_t j = 0; j < size; ++j) {
SSttLvl *pSttLevel = ((STFileSet *)pCurrentFileSet)->lvlArr->data[j]; SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
ASSERT(pSttLevel->level == j); ASSERT(pSttLevel->level == j);
SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j); SArray* pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
int32_t numOfIter = taosArrayGetSize(pList); int32_t numOfIter = taosArrayGetSize(pList);
if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) { if (numOfIter < TARRAY2_SIZE(pSttLevel->fobjArr)) {
...@@ -773,7 +770,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6 ...@@ -773,7 +770,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6
// open stt file reader if not // open stt file reader if not
if (pSttFileReader == NULL) { if (pSttFileReader == NULL) {
SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.szPage}; SSttFileReaderConfig conf = {.tsdb = pConf->pTsdb, .szPage = pConf->pTsdb->pVnode->config.szPage};
conf.file[0] = *pSttLevel->fobjArr->data[i]->f; conf.file[0] = *pSttLevel->fobjArr->data[i]->f;
code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader); code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
...@@ -783,12 +780,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6 ...@@ -783,12 +780,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6
} }
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
pLoadInfo = tCreateOneLastBlockLoadInfo(pSchema, pCols, numOfCols); pLoadInfo = tCreateOneLastBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
} }
memset(pIter, 0, sizeof(SLDataIter)); memset(pIter, 0, sizeof(SLDataIter));
code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, pConf->suid, pConf->uid, &pConf->timewindow,
pLoadInfo, pMTree->idStr, strictTimeRange, pReader); &pConf->verRange, pLoadInfo, pMTree->idStr, pConf->strictTimeRange, pConf->pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _end; goto _end;
} }
......
...@@ -2095,17 +2095,31 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan ...@@ -2095,17 +2095,31 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
pScanInfo->uid, pReader->idStr); pScanInfo->uid, pReader->idStr);
int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb, SMergeTreeConf conf = {
pReader->info.suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, .uid = pScanInfo->uid,
false, pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, .suid = pReader->info.suid,
pReader->info.pSchema, pReader->suppInfo.colId, pReader->suppInfo.numOfCols, pReader); .pTsdb = pReader->pTsdb,
.timewindow = w,
.verRange = pLBlockReader->verRange,
.strictTimeRange = false,
.pSchema = pReader->info.pSchema,
.pCurrentFileset = pReader->status.pCurrentFileset,
.backward = (pLBlockReader->order == TSDB_ORDER_DESC),
.pSttFileBlockIterArray = pReader->status.pLDataIterArray,
.pCols = pReader->suppInfo.colId,
.numOfCols = pReader->suppInfo.numOfCols,
.pReader = pReader,
.idstr = pReader->idStr,
};
int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, &conf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
initMemDataIterator(pScanInfo, pReader); initMemDataIterator(pScanInfo, pReader);
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
int64_t el = taosGetTimestampUs() - st; int64_t el = taosGetTimestampUs() - st;
......
...@@ -452,13 +452,77 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) { ...@@ -452,13 +452,77 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
return true; return true;
} }
typedef enum {
BLK_CHECK_CONTINUE = 0x1,
BLK_CHECK_QUIT = 0x2,
} ETombBlkCheckEnum;
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
STableBlockScanInfo** pScanInfo, ETombBlkCheckEnum* pRet) {
int32_t code = 0;
STombRecord record = {0};
uint64_t uid = pReader->status.uidList.tableUidList[*j];
for (int32_t k = 0; k < TARRAY2_SIZE(pBlock->suid); ++k) {
code = tTombBlockGet(pBlock, k, &record);
if (code != TSDB_CODE_SUCCESS) {
*pRet = BLK_CHECK_QUIT;
return code;
}
if (record.suid < pReader->info.suid) {
continue;
}
if (record.suid > pReader->info.suid) {
*pRet = BLK_CHECK_QUIT;
return TSDB_CODE_SUCCESS;
}
bool newTable = false;
if (uid < record.uid) {
while ((*j) < numOfTables && pReader->status.uidList.tableUidList[*j] < record.uid) {
(*j) += 1;
newTable = true;
}
if ((*j) >= numOfTables) {
*pRet = BLK_CHECK_QUIT;
return TSDB_CODE_SUCCESS;
}
uid = pReader->status.uidList.tableUidList[*j];
}
if (record.uid < uid) {
continue;
}
ASSERT(record.suid == pReader->info.suid && uid == record.uid);
if (newTable) {
(*pScanInfo) = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if ((*pScanInfo)->pfileDelData == NULL) {
(*pScanInfo)->pfileDelData = taosArrayInit(4, sizeof(SDelData));
}
}
if (record.version <= pReader->info.verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush((*pScanInfo)->pfileDelData, &delData);
}
}
*pRet = BLK_CHECK_CONTINUE;
return TSDB_CODE_SUCCESS;
}
// load tomb data API // load tomb data API
static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReader* pReader, static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReader* pReader,
void* pFileReader, bool isFile) { void* pFileReader, bool isFile) {
int32_t code = 0; int32_t code = 0;
STableUidList* pList = &pReader->status.uidList; STableUidList* pList = &pReader->status.uidList;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while (i < pTombBlkArray->size && j < numOfTables) { while (i < pTombBlkArray->size && j < numOfTables) {
...@@ -496,59 +560,15 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs ...@@ -496,59 +560,15 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
} }
STombRecord record = {0}; ETombBlkCheckEnum ret = 0;
for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) { code = doCheckTombBlock(&block, pReader, numOfTables, &j, &pScanInfo, &ret);
code = tTombBlockGet(&block, k, &record);
if (code != TSDB_CODE_SUCCESS) {
tTombBlockDestroy(&block);
return code;
}
if (record.suid < pReader->info.suid) {
continue;
}
if (record.suid > pReader->info.suid) {
tTombBlockDestroy(&block);
return TSDB_CODE_SUCCESS;
}
bool newTable = false;
if (uid < record.uid) {
while (j < numOfTables && pReader->status.uidList.tableUidList[j] < record.uid) {
j += 1;
newTable = true;
}
if (j >= numOfTables) { tTombBlockDestroy(&block);
tTombBlockDestroy(&block); if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) {
return TSDB_CODE_SUCCESS; return code;
}
uid = pReader->status.uidList.tableUidList[j];
}
if (record.uid < uid) {
continue;
}
ASSERT(record.suid == pReader->info.suid && uid == record.uid);
if (newTable) {
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pfileDelData == NULL) {
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
}
}
if (record.version <= pReader->info.verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pScanInfo->pfileDelData, &delData);
}
} }
i += 1; i += 1;
tTombBlockDestroy(&block);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册