提交 89f7ad29 编写于 作者: M Minglei Jin

fix: optimize tsdb cache loading speed

上级 a01d49b2
...@@ -163,7 +163,8 @@ void *tsdbGetIdx(SMeta *pMeta); ...@@ -163,7 +163,8 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader); uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, uint64_t suid,
void **pReader);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
...@@ -298,29 +298,6 @@ int32_t tsdbMerge(STsdb *pTsdb); ...@@ -298,29 +298,6 @@ int32_t tsdbMerge(STsdb *pTsdb);
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0) #define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) #define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
// tsdbCache ==============================================================================================
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// tsdbDiskData ============================================================================================== // tsdbDiskData ==============================================================================================
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);
void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder); void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder);
...@@ -729,6 +706,44 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); ...@@ -729,6 +706,44 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
// tsdbCache ==============================================================================================
typedef struct SCacheRowsReader {
SVnode *pVnode;
STSchema *pSchema;
uint64_t uid;
uint64_t suid;
char **transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
SArray *pTableList; // table id list
SSttBlockLoadInfo *pLoadInfo;
STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader;
} SCacheRowsReader;
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// ========== inline functions ========== // ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1; TSDBKEY *pKey1 = (TSDBKEY *)p1;
......
...@@ -26,7 +26,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { ...@@ -26,7 +26,7 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
goto _err; goto _err;
} }
taosLRUCacheSetStrictCapacity(pCache, true); taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->lruMutex, NULL); taosThreadMutexInit(&pTsdb->lruMutex, NULL);
...@@ -488,11 +488,12 @@ typedef struct { ...@@ -488,11 +488,12 @@ typedef struct {
int32_t nFileSet; int32_t nFileSet;
int32_t iFileSet; int32_t iFileSet;
SArray *aDFileSet; SArray *aDFileSet;
SDataFReader *pDataFReader; SDataFReader **pDataFReader;
TSDBROW row; TSDBROW row;
SMergeTree mergeTree; SMergeTree mergeTree;
SMergeTree *pMergeTree; SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo;
} SFSLastNextRowIter; } SFSLastNextRowIter;
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
...@@ -519,18 +520,20 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { ...@@ -519,18 +520,20 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
return code; return code;
} }
if (state->pDataFReader != NULL) { if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) {
tsdbDataFReaderClose(&state->pDataFReader); if (*state->pDataFReader != NULL) {
state->pDataFReader = NULL; tsdbDataFReaderClose(state->pDataFReader);
}
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); resetLastBlockLoadInfo(state->pLoadInfo);
if (code) goto _err; }
SSttBlockLoadInfo *pLoadInfo = tCreateLastBlockLoadInfo(state->pTSchema, NULL, 0); code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet);
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, if (code) goto _err;
}
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo, true, NULL); &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL);
state->pMergeTree = &state->mergeTree; state->pMergeTree = &state->mergeTree;
bool hasVal = tMergeTreeNext(&state->mergeTree); bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) { if (!hasVal) {
...@@ -554,10 +557,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { ...@@ -554,10 +557,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
} }
_err: _err:
if (state->pDataFReader) { /*if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader); tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL; state->pDataFReader = NULL;
} }*/
if (state->pMergeTree != NULL) { if (state->pMergeTree != NULL) {
tMergeTreeClose(state->pMergeTree); tMergeTreeClose(state->pMergeTree);
state->pMergeTree = NULL; state->pMergeTree = NULL;
...@@ -575,12 +578,12 @@ int32_t clearNextRowFromFSLast(void *iter) { ...@@ -575,12 +578,12 @@ int32_t clearNextRowFromFSLast(void *iter) {
if (!state) { if (!state) {
return code; return code;
} }
/*
if (state->pDataFReader) { if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader); tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL; state->pDataFReader = NULL;
} }
*/
if (state->pMergeTree != NULL) { if (state->pMergeTree != NULL) {
tMergeTreeClose(state->pMergeTree); tMergeTreeClose(state->pMergeTree);
state->pMergeTree = NULL; state->pMergeTree = NULL;
...@@ -597,27 +600,28 @@ typedef enum SFSNEXTROWSTATES { ...@@ -597,27 +600,28 @@ typedef enum SFSNEXTROWSTATES {
} SFSNEXTROWSTATES; } SFSNEXTROWSTATES;
typedef struct SFSNextRowIter { typedef struct SFSNextRowIter {
SFSNEXTROWSTATES state; // [input] SFSNEXTROWSTATES state; // [input]
STsdb *pTsdb; // [input] STsdb *pTsdb; // [input]
SBlockIdx *pBlockIdxExp; // [input] SBlockIdx *pBlockIdxExp; // [input]
STSchema *pTSchema; // [input] STSchema *pTSchema; // [input]
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
int32_t nFileSet; int32_t nFileSet;
int32_t iFileSet; int32_t iFileSet;
SArray *aDFileSet; SArray *aDFileSet;
SDataFReader *pDataFReader; SDataFReader **pDataFReader;
SArray *aBlockIdx; SArray *aBlockIdx;
SBlockIdx *pBlockIdx; SBlockIdx *pBlockIdx;
SMapData blockMap; SMapData blockMap;
int32_t nBlock; int32_t nBlock;
int32_t iBlock; int32_t iBlock;
SDataBlk block; SDataBlk block;
SBlockData blockData; SBlockData blockData;
SBlockData *pBlockData; SBlockData *pBlockData;
int32_t nRow; int32_t nRow;
int32_t iRow; int32_t iRow;
TSDBROW row; TSDBROW row;
SSttBlockLoadInfo *pLoadInfo;
} SFSNextRowIter; } SFSNextRowIter;
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
...@@ -648,8 +652,16 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -648,8 +652,16 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
return code; return code;
} }
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) {
if (code) goto _err; if (*state->pDataFReader != NULL) {
tsdbDataFReaderClose(state->pDataFReader);
resetLastBlockLoadInfo(state->pLoadInfo);
}
code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet);
if (code) goto _err;
}
// tMapDataReset(&state->blockIdxMap); // tMapDataReset(&state->blockIdxMap);
if (!state->aBlockIdx) { if (!state->aBlockIdx) {
...@@ -657,7 +669,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -657,7 +669,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
} else { } else {
taosArrayClear(state->aBlockIdx); taosArrayClear(state->aBlockIdx);
} }
code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx); code = tsdbReadBlockIdx(*state->pDataFReader, state->aBlockIdx);
if (code) goto _err; if (code) goto _err;
/* if (state->pBlockIdx) { */ /* if (state->pBlockIdx) { */
...@@ -666,17 +678,20 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -666,17 +678,20 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
* &state->blockIdx); * &state->blockIdx);
*/ */
state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ); state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ);
if (!state->pBlockIdx) { if (!state->pBlockIdx) { /*
tsdbDataFReaderClose(&state->pDataFReader); tsdbDataFReaderClose(state->pDataFReader);
state->pDataFReader = NULL; *state->pDataFReader = NULL;
resetLastBlockLoadInfo(state->pLoadInfo);*/
goto _next_fileset; goto _next_fileset;
} }
tMapDataReset(&state->blockMap);
/*
if (state->blockMap.pData != NULL) { if (state->blockMap.pData != NULL) {
tMapDataClear(&state->blockMap); tMapDataClear(&state->blockMap);
} }
*/
code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap); code = tsdbReadDataBlk(*state->pDataFReader, state->pBlockIdx, &state->blockMap);
if (code) goto _err; if (code) goto _err;
state->nBlock = state->blockMap.nItem; state->nBlock = state->blockMap.nItem;
...@@ -703,7 +718,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -703,7 +718,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0); code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0);
if (code) goto _err; if (code) goto _err;
code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData); code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData);
if (code) goto _err; if (code) goto _err;
state->nRow = state->blockData.nRow; state->nRow = state->blockData.nRow;
...@@ -719,8 +734,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -719,8 +734,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (--state->iRow < 0) { if (--state->iRow < 0) {
state->state = SFSNEXTROW_BLOCKDATA; state->state = SFSNEXTROW_BLOCKDATA;
if (--state->iBlock < 0) { if (--state->iBlock < 0) {
tsdbDataFReaderClose(&state->pDataFReader); tsdbDataFReaderClose(state->pDataFReader);
state->pDataFReader = NULL; *state->pDataFReader = NULL;
resetLastBlockLoadInfo(state->pLoadInfo);
if (state->aBlockIdx) { if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx); taosArrayDestroy(state->aBlockIdx);
...@@ -739,16 +755,17 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -739,16 +755,17 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
} }
_err: _err:
if (state->pDataFReader) { /*
tsdbDataFReaderClose(&state->pDataFReader); if (*state->pDataFReader) {
state->pDataFReader = NULL; tsdbDataFReaderClose(state->pDataFReader);
} *state->pDataFReader = NULL;
resetLastBlockLoadInfo(state->pLoadInfo);
}*/
if (state->aBlockIdx) { if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx); taosArrayDestroy(state->aBlockIdx);
state->aBlockIdx = NULL; state->aBlockIdx = NULL;
} }
if (state->pBlockData) { if (state->pBlockData) {
// tBlockDataDestroy(&state->blockData, 1);
tBlockDataDestroy(state->pBlockData, 1); tBlockDataDestroy(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }
...@@ -765,11 +782,11 @@ int32_t clearNextRowFromFS(void *iter) { ...@@ -765,11 +782,11 @@ int32_t clearNextRowFromFS(void *iter) {
if (!state) { if (!state) {
return code; return code;
} }
/*
if (state->pDataFReader) { if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader); tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL; state->pDataFReader = NULL;
} }*/
if (state->aBlockIdx) { if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx); taosArrayDestroy(state->aBlockIdx);
state->aBlockIdx = NULL; state->aBlockIdx = NULL;
...@@ -930,25 +947,21 @@ typedef struct { ...@@ -930,25 +947,21 @@ typedef struct {
TSDBROW memRow, imemRow, fsLastRow, fsRow; TSDBROW memRow, imemRow, fsLastRow, fsRow;
TsdbNextRowState input[4]; TsdbNextRowState input[4];
STsdbReadSnap *pReadSnap;
STsdb *pTsdb; STsdb *pTsdb;
} CacheNextRowIter; } CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema) { static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader) {
int code = 0; int code = 0;
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap, NULL);
STbData *pMem = NULL; STbData *pMem = NULL;
if (pIter->pReadSnap->pMem) { if (pReadSnap->pMem) {
pMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid); pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
} }
STbData *pIMem = NULL; STbData *pIMem = NULL;
if (pIter->pReadSnap->pIMem) { if (pReadSnap->pIMem) {
pIMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid); pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
} }
pIter->pTsdb = pTsdb; pIter->pTsdb = pTsdb;
...@@ -957,7 +970,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -957,7 +970,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
SDelIdx delIdx; SDelIdx delIdx;
SDelFile *pDelFile = pIter->pReadSnap->fs.pDelFile; SDelFile *pDelFile = pReadSnap->fs.pDelFile;
if (pDelFile) { if (pDelFile) {
SDelFReader *pDelFReader; SDelFReader *pDelFReader;
...@@ -988,18 +1001,22 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -988,18 +1001,22 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS; pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
pIter->fsLastState.pTsdb = pTsdb; pIter->fsLastState.pTsdb = pTsdb;
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; pIter->fsLastState.aDFileSet = pReadSnap->fs.aDFileSet;
pIter->fsLastState.pTSchema = pTSchema; pIter->fsLastState.pTSchema = pTSchema;
pIter->fsLastState.suid = suid; pIter->fsLastState.suid = suid;
pIter->fsLastState.uid = uid; pIter->fsLastState.uid = uid;
pIter->fsLastState.pLoadInfo = pLoadInfo;
pIter->fsLastState.pDataFReader = pDataFReader;
pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb; pIter->fsState.pTsdb = pTsdb;
pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; pIter->fsState.aDFileSet = pReadSnap->fs.aDFileSet;
pIter->fsState.pBlockIdxExp = &pIter->idx; pIter->fsState.pBlockIdxExp = &pIter->idx;
pIter->fsState.pTSchema = pTSchema; pIter->fsState.pTSchema = pTSchema;
pIter->fsState.suid = suid; pIter->fsState.suid = suid;
pIter->fsState.uid = uid; pIter->fsState.uid = uid;
pIter->fsState.pLoadInfo = pLoadInfo;
pIter->fsState.pDataFReader = pDataFReader;
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
...@@ -1040,8 +1057,6 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { ...@@ -1040,8 +1057,6 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
taosArrayDestroy(pIter->pSkyline); taosArrayDestroy(pIter->pSkyline);
} }
tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap, NULL);
_err: _err:
return code; return code;
} }
...@@ -1119,10 +1134,10 @@ _err: ...@@ -1119,10 +1134,10 @@ _err:
return code; return code;
} }
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray) { static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) {
int32_t code = 0; int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
int16_t nCol = pTSchema->numOfCols; int16_t nCol = pTSchema->numOfCols;
int16_t iCol = 0; int16_t iCol = 0;
int16_t noneCol = 0; int16_t noneCol = 0;
...@@ -1133,7 +1148,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo ...@@ -1133,7 +1148,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
TSKEY lastRowTs = TSKEY_MAX; TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema); nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader);
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
...@@ -1233,20 +1248,20 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo ...@@ -1233,20 +1248,20 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
} }
nextRowIterClose(&iter); nextRowIterClose(&iter);
taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
return code; return code;
_err: _err:
nextRowIterClose(&iter); nextRowIterClose(&iter);
taosArrayDestroy(pColArray); taosArrayDestroy(pColArray);
taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
return code; return code;
} }
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) {
int32_t code = 0; int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
int16_t nCol = pTSchema->numOfCols; int16_t nCol = pTSchema->numOfCols;
int16_t iCol = 0; int16_t iCol = 0;
int16_t noneCol = 0; int16_t noneCol = 0;
...@@ -1257,7 +1272,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { ...@@ -1257,7 +1272,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
TSKEY lastRowTs = TSKEY_MAX; TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema); nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader);
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
...@@ -1350,18 +1365,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { ...@@ -1350,18 +1365,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
} }
nextRowIterClose(&iter); nextRowIterClose(&iter);
taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
return code; return code;
_err: _err:
nextRowIterClose(&iter); nextRowIterClose(&iter);
taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
*ppLastArray = NULL; *ppLastArray = NULL;
taosArrayDestroy(pColArray); taosArrayDestroy(pColArray);
return code; return code;
} }
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
int32_t code = 0; int32_t code = 0;
char key[32] = {0}; char key[32] = {0};
int keyLen = 0; int keyLen = 0;
...@@ -1370,13 +1385,14 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH ...@@ -1370,13 +1385,14 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
getTableCacheKey(uid, 0, key, &keyLen); getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) { if (!h) {
STsdb *pTsdb = pr->pVnode->pTsdb;
taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, keyLen); h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) { if (!h) {
SArray *pArray = NULL; SArray *pArray = NULL;
bool dup = false; // which is always false for now bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pArray); code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr);
// if table's empty or error, return code of -1 // if table's empty or error, return code of -1
if (code < 0 || pArray == NULL) { if (code < 0 || pArray == NULL) {
if (!dup && pArray) { if (!dup && pArray) {
...@@ -1392,17 +1408,17 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH ...@@ -1392,17 +1408,17 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray); size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast; _taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW); LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) { if (status != TAOS_LRU_STATUS_OK) {
code = -1; code = -1;
} }
taosThreadMutexUnlock(&pTsdb->lruMutex); // taosThreadMutexUnlock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, keyLen); // h = taosLRUCacheLookup(pCache, key, keyLen);
} else { } // else {
taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
} //}
} }
*handle = h; *handle = h;
...@@ -1434,7 +1450,7 @@ _err: ...@@ -1434,7 +1450,7 @@ _err:
return code; return code;
} }
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
int32_t code = 0; int32_t code = 0;
char key[32] = {0}; char key[32] = {0};
int keyLen = 0; int keyLen = 0;
...@@ -1443,12 +1459,13 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand ...@@ -1443,12 +1459,13 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
getTableCacheKey(uid, 1, key, &keyLen); getTableCacheKey(uid, 1, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) { if (!h) {
STsdb *pTsdb = pr->pVnode->pTsdb;
taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, keyLen); h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) { if (!h) {
SArray *pLastArray = NULL; SArray *pLastArray = NULL;
code = mergeLast(uid, pTsdb, &pLastArray); code = mergeLast(uid, pTsdb, &pLastArray, pr);
// if table's empty or error, return code of -1 // if table's empty or error, return code of -1
if (code < 0 || pLastArray == NULL) { if (code < 0 || pLastArray == NULL) {
taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
...@@ -1460,17 +1477,17 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand ...@@ -1460,17 +1477,17 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray); size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast; _taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status = LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW); taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) { if (status != TAOS_LRU_STATUS_OK) {
code = -1; code = -1;
} }
taosThreadMutexUnlock(&pTsdb->lruMutex); // taosThreadMutexUnlock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, keyLen); // h = taosLRUCacheLookup(pCache, key, keyLen);
} else { } // else {
taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
} //}
} }
*handle = h; *handle = h;
......
...@@ -18,18 +18,7 @@ ...@@ -18,18 +18,7 @@
#include "tcommon.h" #include "tcommon.h"
#include "tsdb.h" #include "tsdb.h"
typedef struct SCacheRowsReader { #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
SVnode* pVnode;
STSchema* pSchema;
uint64_t uid;
char** transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
SArray* pTableList; // table id list
} SCacheRowsReader;
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
void** pRes) { void** pRes) {
...@@ -56,7 +45,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea ...@@ -56,7 +45,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
varDataSetLen(p->buf, pColVal->colVal.value.nData); varDataSetLen(p->buf, pColVal->colVal.value.nData);
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
} else { } else {
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
p->bytes = pReader->pSchema->columns[slotId].bytes; p->bytes = pReader->pSchema->columns[slotId].bytes;
...@@ -101,7 +90,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea ...@@ -101,7 +90,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) { int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, uint64_t suid,
void** pReader) {
*pReader = NULL; *pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
...@@ -112,6 +102,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList ...@@ -112,6 +102,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
p->type = type; p->type = type;
p->pVnode = pVnode; p->pVnode = pVnode;
p->numOfCols = numOfCols; p->numOfCols = numOfCols;
p->suid = suid;
if (taosArrayGetSize(pTableIdList) == 0) { if (taosArrayGetSize(pTableIdList) == 0) {
*pReader = p; *pReader = p;
...@@ -138,6 +129,12 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList ...@@ -138,6 +129,12 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList
} }
} }
p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0);
if (p->pLoadInfo == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pReader = p; *pReader = p;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -154,6 +151,8 @@ void* tsdbCacherowsReaderClose(void* pReader) { ...@@ -154,6 +151,8 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(p->pSchema); taosMemoryFree(p->pSchema);
} }
destroyLastBlockLoadInfo(p->pLoadInfo);
taosMemoryFree(pReader); taosMemoryFree(pReader);
return NULL; return NULL;
} }
...@@ -164,9 +163,9 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint ...@@ -164,9 +163,9 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
*pRow = NULL; *pRow = NULL;
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) { if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h); code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
} else { } else {
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); code = tsdbCacheGetLastH(lruCache, uid, pr, h);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -182,7 +181,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint ...@@ -182,7 +181,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
} }
static void freeItem(void* pItem) { static void freeItem(void* pItem) {
SLastCol* pCol = (SLastCol*) pItem; SLastCol* pCol = (SLastCol*)pItem;
if (IS_VAR_DATA_TYPE(pCol->colVal.type)) { if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
taosMemoryFree(pCol->colVal.value.pData); taosMemoryFree(pCol->colVal.value.pData);
} }
...@@ -223,7 +222,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -223,7 +222,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) { for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
struct STColumn* pCol = &pr->pSchema->columns[i]; struct STColumn* pCol = &pr->pSchema->columns[i];
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type}; SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type};
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char)); p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
...@@ -231,6 +230,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -231,6 +230,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArrayPush(pLastCols, &p); taosArrayPush(pLastCols, &p);
} }
tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l");
pr->pDataFReader = NULL;
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
...@@ -299,7 +301,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -299,7 +301,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pr->pTableList, i);
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -324,7 +326,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -324,7 +326,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
} }
_end: _end:
tsdbDataFReaderClose(&pr->pDataFReader);
tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l");
for (int32_t j = 0; j < pr->numOfCols; ++j) { for (int32_t j = 0; j < pr->numOfCols; ++j) {
taosMemoryFree(pRes[j]); taosMemoryFree(pRes[j]);
} }
......
...@@ -58,17 +58,19 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -58,17 +58,19 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
// partition by tbname // partition by tbname
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW); pInfo->retrieveType =
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); taosArrayGetSize(pInfo->pColMatchInfo), pTableList->suid, &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
} else { // by tags } else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW); pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE |
(pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
} }
if (pScanNode->scan.pScanPseudoCols != NULL) { if (pScanNode->scan.pScanPseudoCols != NULL) {
...@@ -184,7 +186,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -184,7 +186,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); taosArrayGetSize(pInfo->pColMatchInfo), pTableList->suid, &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
...@@ -265,4 +267,4 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf ...@@ -265,4 +267,4 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册