提交 b52ad0f7 编写于 作者: M Minglei Jin

tsdb/cache: new separate dataf reader for last file reading

上级 c84618ea
...@@ -720,6 +720,7 @@ typedef struct SCacheRowsReader { ...@@ -720,6 +720,7 @@ typedef struct SCacheRowsReader {
SSttBlockLoadInfo *pLoadInfo; SSttBlockLoadInfo *pLoadInfo;
STsdbReadSnap *pReadSnap; STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader; SDataFReader *pDataFReader;
SDataFReader *pDataFReaderLast;
} SCacheRowsReader; } SCacheRowsReader;
typedef struct { typedef struct {
......
...@@ -951,7 +951,8 @@ typedef struct { ...@@ -951,7 +951,8 @@ typedef struct {
} CacheNextRowIter; } CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader) { SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
SDataFReader **pDataFReaderLast) {
int code = 0; int code = 0;
STbData *pMem = NULL; STbData *pMem = NULL;
...@@ -1006,7 +1007,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -1006,7 +1007,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsLastState.suid = suid; pIter->fsLastState.suid = suid;
pIter->fsLastState.uid = uid; pIter->fsLastState.uid = uid;
pIter->fsLastState.pLoadInfo = pLoadInfo; pIter->fsLastState.pLoadInfo = pLoadInfo;
pIter->fsLastState.pDataFReader = pDataFReader; pIter->fsLastState.pDataFReader = pDataFReaderLast;
pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb; pIter->fsState.pTsdb = pTsdb;
...@@ -1148,7 +1149,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo ...@@ -1148,7 +1149,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
TSKEY lastRowTs = TSKEY_MAX; TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader); nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast);
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
...@@ -1272,7 +1274,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach ...@@ -1272,7 +1274,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
TSKEY lastRowTs = TSKEY_MAX; TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader); nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast);
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
......
...@@ -64,7 +64,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea ...@@ -64,7 +64,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false); colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
} }
pBlock->info.rows += allNullRow? 0:1; pBlock->info.rows += allNullRow ? 0 : 1;
} else { } else {
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)); ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
...@@ -239,6 +239,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -239,6 +239,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l"); tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l");
pr->pDataFReader = NULL; pr->pDataFReader = NULL;
pr->pDataFReaderLast = NULL;
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
...@@ -334,6 +335,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -334,6 +335,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
_end: _end:
tsdbDataFReaderClose(&pr->pDataFReaderLast);
tsdbDataFReaderClose(&pr->pDataFReader); tsdbDataFReaderClose(&pr->pDataFReader);
tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l"); tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册