提交 7d294cac 编写于 作者: M Minglei Jin

tsdb/cache: load current fileset's brinblk once

上级 a593e887
...@@ -841,48 +841,6 @@ typedef enum { ...@@ -841,48 +841,6 @@ typedef enum {
READ_MODE_ALL, READ_MODE_ALL,
} EReadMode; } EReadMode;
typedef struct STsdbReaderInfo {
uint64_t suid;
STSchema *pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
SVersionRange verRange;
int16_t order;
} STsdbReaderInfo;
typedef struct {
SArray *pTombData;
} STableLoadInfo;
struct SDataFileReader;
typedef struct SCacheRowsReader {
STsdb *pTsdb;
STsdbReaderInfo info;
TdThreadMutex readerMutex;
SVnode *pVnode;
STSchema *pSchema;
STSchema *pCurrSchema;
uint64_t uid;
char **transferBuf; // todo remove it soon
int32_t numOfCols;
SArray *pCidList;
int32_t *pSlotIds;
int32_t type;
int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
uint64_t *uidList;
SSHashObj *pTableMap;
SArray *pLDataIterArray;
struct SDataFileReader *pFileReader;
STFileSet *pCurFileSet;
STsdbReadSnap *pReadSnap;
char *idstr;
int64_t lastTs;
} SCacheRowsReader;
typedef struct { typedef struct {
TSKEY ts; TSKEY ts;
int8_t dirty; int8_t dirty;
...@@ -892,14 +850,10 @@ typedef struct { ...@@ -892,14 +850,10 @@ typedef struct {
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype);
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle); int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
...@@ -909,8 +863,6 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); ...@@ -909,8 +863,6 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
// int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// ========== inline functions ========== // ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1; TSDBKEY *pKey1 = (TSDBKEY *)p1;
......
...@@ -1922,6 +1922,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1922,6 +1922,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->pr->pCurFileSet = state->pFileSet; state->pr->pCurFileSet = state->pFileSet;
loadDataTomb(state->pr, state->pr->pFileReader); loadDataTomb(state->pr, state->pr->pFileReader);
int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
} }
if (!state->pIndexList) { if (!state->pIndexList) {
...@@ -1929,12 +1934,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1929,12 +1934,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
} else { } else {
taosArrayClear(state->pIndexList); taosArrayClear(state->pIndexList);
} }
const TBrinBlkArray *pBlkArray = NULL;
int32_t code = tsdbDataFileReadBrinBlk(state->pr->pFileReader, &pBlkArray); const TBrinBlkArray *pBlkArray = state->pr->pBlkArray;
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) { for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) {
SBrinBlk *pBrinBlk = &pBlkArray->data[i]; SBrinBlk *pBrinBlk = &pBlkArray->data[i];
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "tcommon.h" #include "tcommon.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbDataFileRW.h" #include "tsdbDataFileRW.h"
#include "tsdbReadUtil.h"
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
......
...@@ -36,6 +36,16 @@ typedef enum { ...@@ -36,6 +36,16 @@ typedef enum {
EXTERNAL_ROWS_NEXT = 0x3, EXTERNAL_ROWS_NEXT = 0x3,
} EContentData; } EContentData;
typedef struct STsdbReaderInfo {
uint64_t suid;
STSchema* pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
SVersionRange verRange;
int16_t order;
} STsdbReaderInfo;
typedef struct SBlockInfoBuf { typedef struct SBlockInfoBuf {
int32_t currentIndex; int32_t currentIndex;
SArray* pData; SArray* pData;
...@@ -243,6 +253,41 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM ...@@ -243,6 +253,41 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
typedef struct {
SArray* pTombData;
} STableLoadInfo;
struct SDataFileReader;
typedef struct SCacheRowsReader {
STsdb* pTsdb;
STsdbReaderInfo info;
TdThreadMutex readerMutex;
SVnode* pVnode;
STSchema* pSchema;
STSchema* pCurrSchema;
uint64_t uid;
char** transferBuf; // todo remove it soon
int32_t numOfCols;
SArray* pCidList;
int32_t* pSlotIds;
int32_t type;
int32_t tableIndex; // currently returned result tables
STableKeyInfo* pTableList; // table id list
int32_t numOfTables;
uint64_t* uidList;
SSHashObj* pTableMap;
SArray* pLDataIterArray;
struct SDataFileReader* pFileReader;
STFileSet* pCurFileSet;
const TBrinBlkArray* pBlkArray;
STsdbReadSnap* pReadSnap;
char* idstr;
int64_t lastTs;
} SCacheRowsReader;
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册