提交 9b0b4061 编写于 作者: M Minglei Jin

tsdb/cache: relayout cache row reader

上级 8263c49c
......@@ -833,9 +833,25 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *blocks, double *el);
// tsdbCache ==============================================================================================
typedef enum {
READ_MODE_COUNT_ONLY = 0x1,
READ_MODE_ALL,
} EReadMode;
typedef struct STsdbReaderInfo {
uint64_t suid;
STSchema *pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
SVersionRange verRange;
int16_t order;
} STsdbReaderInfo;
typedef struct SCacheRowsReader {
STsdb *pTsdb;
SVersionRange verRange;
STsdb *pTsdb;
STsdbReaderInfo info;
// SVersionRange verRange;
TdThreadMutex readerMutex;
SVnode *pVnode;
STSchema *pSchema;
......
......@@ -1720,6 +1720,42 @@ typedef struct {
SMergeTree *pMergeTree;
} SFSLastIter;
static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) {
int32_t code = 0;
int64_t loadBlocks = 0;
double elapse = 0;
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
SMergeTreeConf conf = {
.uid = uid,
.suid = suid,
.pTsdb = pTsdb,
.timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
.verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX},
.strictTimeRange = false,
.pSchema = pTSchema,
.pCurrentFileset = pFileSet,
.backward = 1,
.pSttFileBlockIterArray = pr->pLDataIterArray,
.pCols = aCols,
.numOfCols = nCols,
.pReader = pr,
.idstr = pr->idstr,
};
code = tMergeTreeOpen2(&iter->mergeTree, &conf);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
iter->pMergeTree = &iter->mergeTree;
return code;
}
static int32_t lastIterClose(SFSLastIter **iter) {
int32_t code = 0;
......@@ -1733,43 +1769,9 @@ static int32_t lastIterClose(SFSLastIter **iter) {
return code;
}
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, STFileSet *pFileSet, int16_t *aCols,
int nCols) {
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
int32_t code = 0;
if (!iter->pMergeTree) {
int64_t loadBlocks = 0;
double elapse = 0;
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
SMergeTreeConf conf = {
.uid = uid,
.suid = suid,
.pTsdb = pTsdb,
.timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
.verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX},
.strictTimeRange = false,
.pSchema = pTSchema,
.pCurrentFileset = pFileSet,
.backward = 1,
.pSttFileBlockIterArray = pr->pLDataIterArray,
.pCols = aCols,
.numOfCols = nCols,
.pReader = pr,
.idstr = pr->idstr,
};
code = tMergeTreeOpen2(&iter->mergeTree, &conf);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
iter->pMergeTree = &iter->mergeTree;
}
// retrieve next row
bool hasVal = tMergeTreeNext(iter->pMergeTree);
if (!hasVal) {
*ppRow = NULL;
......@@ -1823,6 +1825,17 @@ typedef struct SFSNextRowIter {
struct CacheNextRowIter *pRowIter;
} SFSNextRowIter;
static void clearLastFileSet(SFSNextRowIter *state) {
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}
if (state->pBlockData) {
tBlockDataDestroy(state->pBlockData);
state->pBlockData = NULL;
}
}
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols) {
SFSNextRowIter *state = (SFSNextRowIter *)iter;
......@@ -1837,8 +1850,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
if (SFSNEXTROW_FILESET == state->state) {
_next_fileset:
if (--state->iFileSet < 0) { // no fileset left, cleanup and return NULL row
if (--state->iFileSet < 0) {
clearLastFileSet(state);
*ppRow = NULL;
return code;
} else {
state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
}
......@@ -1872,9 +1888,16 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
goto _err;
}
// TODO: load tomb data from data and sttt
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
state->pLastIter = &state->lastIter;
// TODO: load tomb data from data and stt
if (!state->pIndexList) {
state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
} else {
......@@ -1990,8 +2013,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
if (!state->pLastRow) {
// get next row from fslast and process with fs row, --state->Row if select fs row
code = lastIterNext(&state->lastIter, &state->pLastRow, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, state->pFileSet, aCols, nCols);
code = lastIterNext(&state->lastIter, &state->pLastRow);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
......
......@@ -143,7 +143,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->type = type;
p->pVnode = pVnode;
p->pTsdb = p->pVnode->pTsdb;
p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
p->info.verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
p->numOfCols = numOfCols;
p->pCidList = pCidList;
p->pSlotIds = pSlotIds;
......
......@@ -30,11 +30,6 @@ typedef enum {
READER_STATUS_NORMAL = 0x2,
} EReaderStatus;
typedef enum {
READ_MODE_COUNT_ONLY = 0x1,
READ_MODE_ALL,
} EReadMode;
typedef enum {
EXTERNAL_ROWS_PREV = 0x1,
EXTERNAL_ROWS_MAIN = 0x2,
......@@ -69,16 +64,6 @@ typedef struct STableBlockScanInfo {
bool iterInit; // whether to initialize the in-memory skip list iterator or not
} STableBlockScanInfo;
typedef struct STsdbReaderInfo {
uint64_t suid;
STSchema* pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
SVersionRange verRange;
int16_t order;
} STsdbReaderInfo;
typedef struct SResultBlockInfo {
SSDataBlock* pResBlock;
bool freeBlock;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册