提交 2b21c5e0 编写于 作者: M Minglei Jin

tsdb2: adapt cache with tsdb2 API

上级 145c43e3
...@@ -368,20 +368,20 @@ typedef struct { ...@@ -368,20 +368,20 @@ typedef struct {
} SCacheFlushState; } SCacheFlushState;
struct STsdb { struct STsdb {
char *path; char *path;
SVnode *pVnode; SVnode *pVnode;
STsdbKeepCfg keepCfg; STsdbKeepCfg keepCfg;
TdThreadRwlock rwLock; TdThreadRwlock rwLock;
SMemTable *mem; SMemTable *mem;
SMemTable *imem; SMemTable *imem;
STsdbFS fs; // old STsdbFS fs; // old
SLRUCache *lruCache; SLRUCache *lruCache;
SCacheFlushState flushState; SCacheFlushState flushState;
TdThreadMutex lruMutex; TdThreadMutex lruMutex;
SLRUCache *biCache; SLRUCache *biCache;
TdThreadMutex biMutex; TdThreadMutex biMutex;
struct STFileSystem *pFS; // new struct STFileSystem *pFS; // new
SRocksCache rCache; SRocksCache rCache;
}; };
struct TSDBKEY { struct TSDBKEY {
...@@ -659,12 +659,19 @@ struct SDelFWriter { ...@@ -659,12 +659,19 @@ struct SDelFWriter {
uint8_t *aBuf[1]; uint8_t *aBuf[1];
}; };
#include "tarray2.h"
//#include "tsdbFS2.h"
// struct STFileSet;
typedef struct STFileSet STFileSet;
typedef TARRAY2(STFileSet *) TFileSetArray;
struct STsdbReadSnap { struct STsdbReadSnap {
SMemTable *pMem; SMemTable *pMem;
SQueryNode *pNode; SQueryNode *pNode;
SMemTable *pIMem; SMemTable *pIMem;
SQueryNode *pINode; SQueryNode *pINode;
STsdbFS fs; TFileSetArray *pfSetArray;
STsdbFS fs;
}; };
struct SDataFWriter { struct SDataFWriter {
...@@ -777,7 +784,7 @@ struct SDiskDataBuilder { ...@@ -777,7 +784,7 @@ struct SDiskDataBuilder {
typedef struct SLDataIter { typedef struct SLDataIter {
SRBTreeNode node; SRBTreeNode node;
SSttBlk *pSttBlk; SSttBlk *pSttBlk;
int32_t iStt; // for debug purpose int32_t iStt; // for debug purpose
int8_t backward; int8_t backward;
int32_t iSttBlk; int32_t iSttBlk;
int32_t iRow; int32_t iRow;
...@@ -796,22 +803,22 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead ...@@ -796,22 +803,22 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter);
typedef struct { typedef struct {
int8_t backward; int8_t backward;
STsdb *pTsdb; STsdb *pTsdb;
uint64_t suid; uint64_t suid;
uint64_t uid; uint64_t uid;
STimeWindow timewindow; STimeWindow timewindow;
SVersionRange verRange; SVersionRange verRange;
bool strictTimeRange; bool strictTimeRange;
SArray *pSttFileBlockIterArray; SArray *pSttFileBlockIterArray;
void *pCurrentFileset; void *pCurrentFileset;
STSchema *pSchema; STSchema *pSchema;
int16_t *pCols; int16_t *pCols;
int32_t numOfCols; int32_t numOfCols;
void *pReader; void *pReader;
void *idstr; void *idstr;
} SMergeTreeConf; } SMergeTreeConf;
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf* pConf); int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
...@@ -827,29 +834,28 @@ void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *block ...@@ -827,29 +834,28 @@ void *destroySttBlockReader(SArray *pLDataIterArray, int64_t *block
// tsdbCache ============================================================================================== // tsdbCache ==============================================================================================
typedef struct SCacheRowsReader { typedef struct SCacheRowsReader {
STsdb *pTsdb; STsdb *pTsdb;
SVersionRange verRange; SVersionRange verRange;
TdThreadMutex readerMutex; TdThreadMutex readerMutex;
SVnode *pVnode; SVnode *pVnode;
STSchema *pSchema; STSchema *pSchema;
STSchema *pCurrSchema; STSchema *pCurrSchema;
uint64_t uid; uint64_t uid;
uint64_t suid; uint64_t suid;
char **transferBuf; // todo remove it soon char **transferBuf; // todo remove it soon
int32_t numOfCols; int32_t numOfCols;
SArray *pCidList; SArray *pCidList;
int32_t *pSlotIds; int32_t *pSlotIds;
int32_t type; int32_t type;
int32_t tableIndex; // currently returned result tables int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list STableKeyInfo *pTableList; // table id list
int32_t numOfTables; int32_t numOfTables;
SSttBlockLoadInfo *pLoadInfo; SArray *pLDataIterArray;
SLDataIter *pDataIter; STsdbReadSnap *pReadSnap;
STsdbReadSnap *pReadSnap; SDataFReader *pDataFReader;
SDataFReader *pDataFReader; SDataFReader *pDataFReaderLast;
SDataFReader *pDataFReaderLast; char *idstr;
const char *idstr; int64_t lastTs;
int64_t lastTs;
} SCacheRowsReader; } SCacheRowsReader;
typedef struct { typedef struct {
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tsdb.h" #include "tsdb.h"
#include "tsdbDataFileRW.h"
#include "vnd.h" #include "vnd.h"
#define ROCKS_BATCH_SIZE (4096) #define ROCKS_BATCH_SIZE (4096)
...@@ -1714,201 +1715,327 @@ _err: ...@@ -1714,201 +1715,327 @@ _err:
return code; return code;
} }
*/ */
typedef enum {
SFSLASTNEXTROW_FS,
SFSLASTNEXTROW_FILESET,
SFSLASTNEXTROW_BLOCKDATA,
SFSLASTNEXTROW_BLOCKROW
} SFSLASTNEXTROWSTATES;
typedef struct { typedef struct {
SFSLASTNEXTROWSTATES state; // [input] SMergeTree mergeTree;
STsdb *pTsdb; // [input] SMergeTree *pMergeTree;
STSchema *pTSchema; // [input] } SFSLastIter;
tb_uid_t suid;
tb_uid_t uid;
int32_t nFileSet;
int32_t iFileSet;
SArray *aDFileSet;
SDataFReader **pDataFReader;
TSDBROW row;
bool checkRemainingRow;
SMergeTree mergeTree;
SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo;
SLDataIter *pDataIter;
int64_t lastTs;
} SFSLastNextRowIter;
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0;
bool checkRemainingRow = true;
switch (state->state) { static int32_t lastIterClose(SFSLastIter **iter) {
case SFSLASTNEXTROW_FS: int32_t code = 0;
state->nFileSet = taosArrayGetSize(state->aDFileSet);
state->iFileSet = state->nFileSet;
case SFSLASTNEXTROW_FILESET: { if (iter->pMergeTree) {
SDFileSet *pFileSet = NULL; tMergeTreeClose(iter->pMergeTree);
_next_fileset: iter->pMergeTree = NULL;
if (state->pMergeTree != NULL) { }
tMergeTreeClose(state->pMergeTree);
state->pMergeTree = NULL;
}
if (--state->iFileSet >= 0) { *iter = NULL;
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else {
*ppRow = NULL;
return code;
}
if (*state->pDataFReader == NULL || (*state->pDataFReader)->pSet->fid != pFileSet->fid) { return code;
if (*state->pDataFReader != NULL) { }
tsdbDataFReaderClose(state->pDataFReader);
resetLastBlockLoadInfo(state->pLoadInfo); static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
} tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, STFileSet *pFileSet, int16_t *aCols,
int nCols) {
int32_t code = 0;
code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet); if (!iter->pMergeTree) {
if (code) goto _err; int64_t loadBlocks = 0;
} double elapse = 0;
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
SMergeTreeConf conf = {
.uid = uid,
.suid = suid,
.pTsdb = pTsdb,
.timewindow = (STimeWindow){.skey = lastTs, .ekey = TSKEY_MAX},
.verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX},
.strictTimeRange = false,
.pSchema = pTSchema,
.pCurrentFileset = pFileSet,
.backward = 1,
.pSttFileBlockIterArray = pr->pLDataIterArray,
.pCols = aCols,
.numOfCols = nCols,
.pReader = pr,
.idstr = pr->idstr,
};
code = tMergeTreeOpen2(&iter->mergeTree, &conf);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
iter->pMergeTree = &iter->mergeTree;
}
// retrieve next row
bool hasVal = tMergeTreeNext(iter->pMergeTree);
if (!hasVal) {
*ppRow = NULL;
return code;
}
*ppRow = tMergeTreeGetRow(iter->pMergeTree);
return code;
}
typedef enum SFSNEXTROWSTATES {
SFSNEXTROW_FS,
SFSNEXTROW_FILESET,
SFSNEXTROW_INDEXLIST,
SFSNEXTROW_BRINBLOCK,
SFSNEXTROW_BRINRECORD,
SFSNEXTROW_BLOCKDATA,
SFSNEXTROW_BLOCKROW
} SFSNEXTROWSTATES;
struct CacheNextRowIter;
typedef struct SFSNextRowIter {
SFSNEXTROWSTATES state; // [input]
STsdb *pTsdb; // [input]
SBlockIdx *pBlockIdxExp; // [input]
STSchema *pTSchema; // [input]
tb_uid_t suid;
tb_uid_t uid;
int32_t nFileSet;
int32_t iFileSet;
STFileSet *pFileSet;
TFileSetArray *aDFileSet;
SDataFileReader *pFileReader;
SArray *pIndexList;
int32_t iBrinIndex;
SBrinBlock brinBlock;
int32_t iBrinRecord;
SBrinRecord brinRecord;
SBlockData blockData;
SBlockData *pBlockData;
int32_t nRow;
int32_t iRow;
TSDBROW row;
int64_t lastTs;
SFSLastIter lastIter;
SFSLastIter *pLastIter;
TSDBROW *pLastRow;
struct CacheNextRowIter *pRowIter;
} SFSNextRowIter;
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols) {
SFSNextRowIter *state = (SFSNextRowIter *)iter;
int32_t code = 0;
if (SFSNEXTROW_FS == state->state) {
state->nFileSet = TARRAY2_SIZE(state->aDFileSet);
state->iFileSet = state->nFileSet;
state->state = SFSNEXTROW_FILESET;
}
int nTmpCols = nCols; if (SFSNEXTROW_FILESET == state->state) {
bool hasTs = false; _next_fileset:
if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) { if (--state->iFileSet < 0) { // no fileset left, cleanup and return NULL row
--nTmpCols;
hasTs = true; } else {
state->pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
}
STFileObj **pFileObj = pFileSet->farr;
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.szPage};
const char *filesName[4] = {0};
if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f;
conf.files[0].exist = true;
filesName[0] = pFileObj[0]->fname;
conf.files[1].file = *pFileObj[1]->f;
conf.files[1].exist = true;
filesName[1] = pFileObj[1]->fname;
conf.files[2].file = *pFileObj[2]->f;
conf.files[2].exist = true;
filesName[2] = pFileObj[2]->fname;
} }
for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
state->pLoadInfo[i].colIds = hasTs ? aCols + 1 : aCols; if (pFileObj[3] != NULL) {
state->pLoadInfo[i].numOfCols = nTmpCols; conf.files[3].exist = true;
state->pLoadInfo[i].isLast = isLast; conf.files[3].file = *pFileObj[3]->f;
filesName[3] = pFileObj[3]->fname;
} }
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true,
state->pDataIter);
state->pMergeTree = &state->mergeTree;
state->state = SFSLASTNEXTROW_BLOCKROW;
}
case SFSLASTNEXTROW_BLOCKROW: {
if (nCols != state->pLoadInfo->numOfCols) {
for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
state->pLoadInfo[i].numOfCols = nCols;
state->pLoadInfo[i].checkRemainingRow = state->checkRemainingRow; code = tsdbDataFileReaderOpen(filesName, &conf, &state->pFileReader);
} if (code != TSDB_CODE_SUCCESS) {
goto _err;
} }
bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) { // TODO: load tomb data from data and sttt
if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) { state->pLastIter = &state->lastIter;
*pIgnoreEarlierTs = true;
*ppRow = NULL; if (!state->pIndexList) {
return code; state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
} } else {
state->state = SFSLASTNEXTROW_FILESET; taosArrayClear(state->pIndexList);
goto _next_fileset;
} }
state->row = *tMergeTreeGetRow(&state->mergeTree); const TBrinBlkArray *pBlkArray = NULL;
*ppRow = &state->row;
if (TSDBROW_TS(&state->row) <= state->lastTs) { int32_t code = tsdbDataFileReadBrinBlk(state->pFileReader, &pBlkArray);
*pIgnoreEarlierTs = true; if (code != TSDB_CODE_SUCCESS) {
*ppRow = NULL; goto _err;
return code;
} }
*pIgnoreEarlierTs = false; for (int i = TARRAY2_SIZE(pBlkArray); i >= 0; --i) {
/* SBrinBlk *pBrinBlk = &pBlkArray->data[i];
if (!hasVal) { if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
state->state = SFSLASTNEXTROW_FILESET; if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
taosArrayPush(state->pIndexList, pBrinBlk);
}
} else if (state->suid > pBrinBlk->maxTbid.suid ||
(state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
break;
}
} }
*/
if (!state->checkRemainingRow) { int indexSize = TARRAY_SIZE(state->pIndexList);
state->checkRemainingRow = true; if (indexSize <= 0) {
// goto next fileset
} }
return code;
state->state = SFSNEXTROW_INDEXLIST;
state->iBrinIndex = indexSize;
} else { // empty fileset, goto next fileset
} }
default:
ASSERT(0);
break;
} }
_err: if (SFSNEXTROW_INDEXLIST == state->state) {
/*if (state->pDataFReader) { SBrinBlk *pBrinBlk = NULL;
tsdbDataFReaderClose(&state->pDataFReader); _next_brinindex:
state->pDataFReader = NULL; if (--state->iBrinIndex < 0) { // no index left, goto next fileset
}*/ } else {
if (state->pMergeTree != NULL) { pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex);
tMergeTreeClose(state->pMergeTree); }
state->pMergeTree = NULL;
code = tsdbDataFileReadBrinBlock(state->pFileReader, pBrinBlk, &state->brinBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
state->iBrinRecord = BRIN_BLOCK_SIZE(&state->brinBlock) - 1;
state->state = SFSNEXTROW_BRINBLOCK;
} }
*ppRow = NULL; if (SFSNEXTROW_BRINBLOCK == state->state) {
_next_brinrecord:
if (state->iBrinRecord < 0) { // empty brin block, goto _next_brinindex
tBrinBlockClear(&state->brinBlock);
goto _next_brinindex;
}
code = tBrinBlockGet(&state->brinBlock, state->iBrinRecord, &state->brinRecord);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
return code; SBrinRecord *pRecord = &state->brinRecord;
} if (pRecord->uid != state->uid) {
// TODO: goto next brin block early
--state->iBrinRecord;
goto _next_brinrecord;
}
int32_t clearNextRowFromFSLast(void *iter) { state->state = SFSNEXTROW_BRINRECORD;
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; }
int32_t code = 0;
if (!state) { if (SFSNEXTROW_BRINRECORD == state->state) {
return code; SBrinRecord *pRecord = &state->brinRecord;
if (!state->pBlockData) {
state->pBlockData = &state->blockData;
code = tBlockDataCreate(&state->blockData);
if (code) goto _err;
} else {
tBlockDataReset(state->pBlockData);
}
if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
--nCols;
++aCols;
}
code = tsdbDataFileReadBlockDataByColumn(state->pFileReader, pRecord, state->pBlockData, state->pTSchema, aCols,
nCols);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
state->nRow = state->blockData.nRow;
state->iRow = state->nRow - 1;
state->state = SFSNEXTROW_BLOCKROW;
} }
/*
if (state->pDataFReader) { if (SFSNEXTROW_BLOCKROW == state->state) {
tsdbDataFReaderClose(&state->pDataFReader); if (state->iRow < 0) {
state->pDataFReader = NULL; --state->iBrinRecord;
goto _next_brinrecord;
}
state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
if (!state->pLastIter) {
*ppRow = &state->row;
--state->iRow;
return code;
}
if (!state->pLastRow) {
// get next row from fslast and process with fs row, --state->Row if select fs row
code = lastIterNext(&state->lastIter, &state->pLastRow, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, state->pFileSet, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
}
if (!state->pLastRow) {
lastIterClose(&state->pLastIter);
*ppRow = &state->row;
--state->iRow;
return code;
}
// process state->pLastRow & state->row
TSKEY rowTs = TSDBROW_TS(&state->row);
TSKEY lastRowTs = TSDBROW_TS(state->pLastRow);
if (lastRowTs > rowTs) {
*ppRow = state->pLastRow;
state->pLastRow = NULL;
return code;
} else if (lastRowTs < rowTs) {
*ppRow = &state->row;
--state->iRow;
return code;
} else {
// TODO: merge rows and *ppRow = mergedRow
}
} }
*/
if (state->pMergeTree != NULL) { _err:
tMergeTreeClose(state->pMergeTree); // TODO: cleanup when error occurs
state->pMergeTree = NULL; if (state->pLastIter) {
lastIterClose(&state->pLastIter);
} }
return code; if (state->pBlockData) {
} tBlockDataDestroy(state->pBlockData);
}
typedef enum SFSNEXTROWSTATES { *ppRow = NULL;
SFSNEXTROW_FS,
SFSNEXTROW_FILESET,
SFSNEXTROW_BLOCKDATA,
SFSNEXTROW_BLOCKROW
} SFSNEXTROWSTATES;
typedef struct SFSNextRowIter { return code;
SFSNEXTROWSTATES state; // [input] }
STsdb *pTsdb; // [input]
SBlockIdx *pBlockIdxExp; // [input]
STSchema *pTSchema; // [input]
tb_uid_t suid;
tb_uid_t uid;
int32_t nFileSet;
int32_t iFileSet;
SArray *aDFileSet;
SDataFReader **pDataFReader;
SArray *aBlockIdx;
LRUHandle *aBlockIdxHandle;
SBlockIdx *pBlockIdx;
SMapData blockMap;
int32_t nBlock;
int32_t iBlock;
SDataBlk block;
SBlockData blockData;
SBlockData *pBlockData;
int32_t nRow;
int32_t iRow;
TSDBROW row;
SSttBlockLoadInfo *pLoadInfo;
int64_t lastTs;
} SFSNextRowIter;
#if 0
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols) { int nCols) {
SFSNextRowIter *state = (SFSNextRowIter *)iter; SFSNextRowIter *state = (SFSNextRowIter *)iter;
...@@ -1917,17 +2044,16 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1917,17 +2044,16 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
switch (state->state) { switch (state->state) {
case SFSNEXTROW_FS: case SFSNEXTROW_FS:
// state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet; state->nFileSet = TARRAY2_SIZE(state->aDFileSet);
state->nFileSet = taosArrayGetSize(state->aDFileSet);
state->iFileSet = state->nFileSet; state->iFileSet = state->nFileSet;
state->pBlockData = NULL; state->pBlockData = NULL;
case SFSNEXTROW_FILESET: { case SFSNEXTROW_FILESET: {
SDFileSet *pFileSet = NULL; STFileSet *pFileSet = NULL;
_next_fileset: _next_fileset:
if (--state->iFileSet >= 0) { if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); pFileSet = TARRAY2_GET(state->aDFileSet, state->iFileSet);
} else { } else {
// tBlockDataDestroy(&state->blockData, 1); // tBlockDataDestroy(&state->blockData, 1);
if (state->pBlockData) { if (state->pBlockData) {
...@@ -2187,6 +2313,8 @@ _err: ...@@ -2187,6 +2313,8 @@ _err:
return code; return code;
} }
#endif
int32_t clearNextRowFromFS(void *iter) { int32_t clearNextRowFromFS(void *iter) {
int32_t code = 0; int32_t code = 0;
...@@ -2365,24 +2493,23 @@ typedef struct { ...@@ -2365,24 +2493,23 @@ typedef struct {
_next_row_clear_fn_t nextRowClearFn; _next_row_clear_fn_t nextRowClearFn;
} TsdbNextRowState; } TsdbNextRowState;
typedef struct { typedef struct CacheNextRowIter {
SArray *pSkyline; SArray *pSkyline;
int64_t iSkyline; int64_t iSkyline;
SBlockIdx idx; SBlockIdx idx;
SMemNextRowIter memState; SMemNextRowIter memState;
SMemNextRowIter imemState; SMemNextRowIter imemState;
SFSLastNextRowIter fsLastState; SFSNextRowIter fsState;
SFSNextRowIter fsState; TSDBROW memRow, imemRow, fsLastRow, fsRow;
TSDBROW memRow, imemRow, fsLastRow, fsRow;
TsdbNextRowState input[4]; TsdbNextRowState input[4];
STsdb *pTsdb; STsdb *pTsdb;
} CacheNextRowIter; } CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
SSttBlockLoadInfo *pLoadInfo, SLDataIter *pLDataIter, STsdbReadSnap *pReadSnap, SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
SDataFReader **pDataFReader, SDataFReader **pDataFReaderLast, int64_t lastTs) { SDataFReader **pDataFReaderLast, int64_t lastTs) {
int code = 0; int code = 0;
STbData *pMem = NULL; STbData *pMem = NULL;
...@@ -2398,7 +2525,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -2398,7 +2525,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->pTsdb = pTsdb; pIter->pTsdb = pTsdb;
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
#if 0
SDelFile *pDelFile = pReadSnap->fs.pDelFile; SDelFile *pDelFile = pReadSnap->fs.pDelFile;
if (pDelFile) { if (pDelFile) {
SDelFReader *pDelFReader; SDelFReader *pDelFReader;
...@@ -2424,20 +2551,20 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -2424,20 +2551,20 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
goto _err; goto _err;
} }
taosArrayDestroy(pDelIdxArray); taosArrayDestroy(pDelIdxArray);
tsdbDelFReaderClose(&pDelFReader); tsdbDelFReaderClose(&pDelFReader);
} else { } else {
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline); code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline);
if (code) goto _err; if (code) goto _err;
} }
#endif
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1; pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
/*
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS; pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
pIter->fsLastState.pTsdb = pTsdb; pIter->fsLastState.pTsdb = pTsdb;
pIter->fsLastState.aDFileSet = pReadSnap->fs.aDFileSet; pIter->fsLastState.aDFileSet = pReadSnap->pfSetArray;
pIter->fsLastState.pTSchema = pTSchema; pIter->fsLastState.pTSchema = pTSchema;
pIter->fsLastState.suid = suid; pIter->fsLastState.suid = suid;
pIter->fsLastState.uid = uid; pIter->fsLastState.uid = uid;
...@@ -2445,22 +2572,24 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -2445,22 +2572,24 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsLastState.pDataFReader = pDataFReaderLast; pIter->fsLastState.pDataFReader = pDataFReaderLast;
pIter->fsLastState.lastTs = lastTs; pIter->fsLastState.lastTs = lastTs;
pIter->fsLastState.pDataIter = pLDataIter; pIter->fsLastState.pDataIter = pLDataIter;
*/
pIter->fsState.pRowIter = pIter;
pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb; pIter->fsState.pTsdb = pTsdb;
pIter->fsState.aDFileSet = pReadSnap->fs.aDFileSet; pIter->fsState.aDFileSet = pReadSnap->pfSetArray;
pIter->fsState.pBlockIdxExp = &pIter->idx; pIter->fsState.pBlockIdxExp = &pIter->idx;
pIter->fsState.pTSchema = pTSchema; pIter->fsState.pTSchema = pTSchema;
pIter->fsState.suid = suid; pIter->fsState.suid = suid;
pIter->fsState.uid = uid; pIter->fsState.uid = uid;
pIter->fsState.pLoadInfo = pLoadInfo;
pIter->fsState.pDataFReader = pDataFReader; pIter->fsState.pDataFReader = pDataFReader;
pIter->fsState.lastTs = lastTs; pIter->fsState.lastTs = lastTs;
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
pIter->input[2] = (TsdbNextRowState){ /*
pIter->input[2] = (TsdbNextRowState){
&pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast}; &pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast};
*/
pIter->input[3] = pIter->input[3] =
(TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS}; (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
...@@ -2632,321 +2761,6 @@ static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64 ...@@ -2632,321 +2761,6 @@ static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64
return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pCurrSchema); return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pCurrSchema);
} }
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) {
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
int16_t nLastCol = pTSchema->numOfCols;
int16_t iCol = 0;
int16_t noneCol = 0;
bool setNoneCol = false;
bool hasRow = false;
bool ignoreEarlierTs = false;
SArray *pColArray = NULL;
SColVal *pColVal = &(SColVal){0};
int32_t code = initLastColArray(pTSchema, &pColArray);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {
TSDBROW *pRow = NULL;
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, NULL, 0);
if (!pRow) {
break;
}
hasRow = true;
int32_t sversion = TSDBROW_SVERSION(pRow);
if (sversion != -1) {
code = updateTSchema(sversion, pr, uid);
if (TSDB_CODE_SUCCESS != code) {
goto _err;
}
pTSchema = pr->pCurrSchema;
}
int16_t nCol = pTSchema->numOfCols;
TSKEY rowTs = TSDBROW_TS(pRow);
if (lastRowTs == TSKEY_MAX) {
lastRowTs = rowTs;
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal});
for (iCol = 1; iCol < nCol; ++iCol) {
if (iCol >= nLastCol) {
break;
}
SLastCol *pCol = taosArrayGet(pColArray, iCol);
if (pCol->colVal.cid != pTSchema->columns[iCol].colId) {
continue;
}
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
*pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
if (COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;
}
}
if (!setNoneCol) {
// done, goto return pColArray
break;
} else {
continue;
}
}
if ((rowTs < lastRowTs)) {
// done, goto return pColArray
break;
}
// merge into pColArray
setNoneCol = false;
for (iCol = noneCol; iCol < nCol; ++iCol) {
// high version's column value
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol);
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
taosArraySet(pColArray, iCol, &lastCol);
} else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;
}
}
} while (setNoneCol);
// build the result ts row here
*dup = false;
// if (taosArrayGetSize(pColArray) != nCol) {
//*ppColArray = NULL;
// taosArrayDestroy(pColArray);
//} else {
if (!hasRow) {
if (ignoreEarlierTs) {
taosArrayDestroy(pColArray);
pColArray = NULL;
} else {
taosArrayClear(pColArray);
}
}
*ppColArray = pColArray;
//}
nextRowIterClose(&iter);
// taosMemoryFreeClear(pTSchema);
return code;
_err:
nextRowIterClose(&iter);
taosArrayDestroy(pColArray);
// taosMemoryFreeClear(pTSchema);
return code;
}
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) {
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
int16_t nLastCol = pTSchema->numOfCols;
int16_t noneCol = 0;
bool setNoneCol = false;
bool hasRow = false;
bool ignoreEarlierTs = false;
SArray *pColArray = NULL;
SColVal *pColVal = &(SColVal){0};
int16_t nCols = nLastCol;
int32_t code = initLastColArray(pTSchema, &pColArray);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
if (NULL == aColArray) {
taosArrayDestroy(pColArray);
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int i = 1; i < pTSchema->numOfCols; ++i) {
taosArrayPush(aColArray, &pTSchema->columns[i].colId);
}
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {
TSDBROW *pRow = NULL;
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
if (!pRow) {
break;
}
hasRow = true;
int32_t sversion = TSDBROW_SVERSION(pRow);
if (sversion != -1) {
code = updateTSchema(sversion, pr, uid);
if (TSDB_CODE_SUCCESS != code) {
goto _err;
}
pTSchema = pr->pCurrSchema;
}
int16_t nCol = pTSchema->numOfCols;
TSKEY rowTs = TSDBROW_TS(pRow);
if (lastRowTs == TSKEY_MAX) {
lastRowTs = rowTs;
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal});
for (int16_t iCol = 1; iCol < nCol; ++iCol) {
if (iCol >= nLastCol) {
break;
}
SLastCol *pCol = taosArrayGet(pColArray, iCol);
if (pCol->colVal.cid != pTSchema->columns[iCol].colId) {
continue;
}
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
*pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
if (!COL_VAL_IS_VALUE(pColVal)) {
if (!setNoneCol) {
noneCol = iCol;
setNoneCol = true;
}
} else {
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
taosArrayRemove(aColArray, aColIndex);
}
}
if (!setNoneCol) {
// done, goto return pColArray
break;
} else {
continue;
}
}
// merge into pColArray
setNoneCol = false;
for (int16_t iCol = noneCol; iCol < nCol; ++iCol) {
if (iCol >= nLastCol) {
break;
}
// high version's column value
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
if (lastColVal->colVal.cid != pTSchema->columns[iCol].colId) {
continue;
}
SColVal *tColVal = &lastColVal->colVal;
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
taosArraySet(pColArray, iCol, &lastCol);
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
taosArrayRemove(aColArray, aColIndex);
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;
}
}
} while (setNoneCol);
// if (taosArrayGetSize(pColArray) <= 0) {
//*ppLastArray = NULL;
// taosArrayDestroy(pColArray);
//} else {
if (!hasRow) {
if (ignoreEarlierTs) {
taosArrayDestroy(pColArray);
pColArray = NULL;
} else {
taosArrayClear(pColArray);
}
}
*ppLastArray = pColArray;
//}
nextRowIterClose(&iter);
taosArrayDestroy(aColArray);
// taosMemoryFreeClear(pTSchema);
return code;
_err:
nextRowIterClose(&iter);
// taosMemoryFreeClear(pTSchema);
*ppLastArray = NULL;
taosArrayDestroy(pColArray);
taosArrayDestroy(aColArray);
return code;
}
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
int nCols, int16_t *slotIds) { int nCols, int16_t *slotIds) {
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
...@@ -2976,7 +2790,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC ...@@ -2976,7 +2790,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
TSKEY lastRowTs = TSKEY_MAX; TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader, nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLDataIterArray, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs); &pr->pDataFReaderLast, pr->lastTs);
do { do {
...@@ -3146,7 +2960,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, ...@@ -3146,7 +2960,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
TSKEY lastRowTs = TSKEY_MAX; TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader, nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLDataIterArray, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs); &pr->pDataFReaderLast, pr->lastTs);
do { do {
...@@ -3236,92 +3050,6 @@ _err: ...@@ -3236,92 +3050,6 @@ _err:
return code; return code;
} }
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKeyS(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STsdb *pTsdb = pr->pVnode->pTsdb;
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
SArray *pArray = NULL;
bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr);
// if table's empty or error or ignore ignore earlier ts, set handle NULL and return
if (code < 0 || pArray == NULL) {
if (!dup && pArray) {
taosArrayDestroy(pArray);
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
*handle = NULL;
return 0;
}
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
*handle = h;
return code;
}
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKeyS(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STsdb *pTsdb = pr->pVnode->pTsdb;
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
SArray *pLastArray = NULL;
code = mergeLast(uid, pTsdb, &pLastArray, pr);
// if table's empty or error or ignore ignore earlier ts, set handle NULL and return
if (code < 0 || pLastArray == NULL) {
taosThreadMutexUnlock(&pTsdb->lruMutex);
*handle = NULL;
return 0;
}
size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
*handle = h;
return code;
}
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t code = 0; int32_t code = 0;
......
...@@ -124,7 +124,10 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf ...@@ -124,7 +124,10 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
pReader->numOfTables = numOfTables; pReader->numOfTables = numOfTables;
pReader->lastTs = INT64_MIN; pReader->lastTs = INT64_MIN;
resetLastBlockLoadInfo(pReader->pLoadInfo); int64_t blocks;
double elapse;
pReader->pLDataIterArray = destroySttBlockReader(pReader->pLDataIterArray, &blocks, &elapse);
pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -178,14 +181,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -178,14 +181,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
SVnodeCfg* pCfg = &((SVnode*)pVnode)->config; SVnodeCfg* pCfg = &((SVnode*)pVnode)->config;
int32_t numOfStt = pCfg->sttTrigger; int32_t numOfStt = pCfg->sttTrigger;
p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt); p->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
if (p->pLoadInfo == NULL) { if (p->pLDataIterArray == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->pDataIter = taosMemoryCalloc(pCfg->sttTrigger, sizeof(SLDataIter));
if (p->pDataIter == NULL) {
tsdbCacherowsReaderClose(p); tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -214,10 +211,11 @@ void* tsdbCacherowsReaderClose(void* pReader) { ...@@ -214,10 +211,11 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(p->pSchema); taosMemoryFree(p->pSchema);
} }
taosMemoryFree(p->pDataIter);
taosMemoryFree(p->pCurrSchema); taosMemoryFree(p->pCurrSchema);
destroyLastBlockLoadInfo(p->pLoadInfo); int64_t loadBlocks = 0;
double elapse = 0;
destroySttBlockReader(p->pLDataIterArray, &loadBlocks, &elapse);
taosMemoryFree((void*)p->idstr); taosMemoryFree((void*)p->idstr);
taosThreadMutexDestroy(&p->readerMutex); taosThreadMutexDestroy(&p->readerMutex);
...@@ -298,7 +296,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -298,7 +296,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
taosThreadMutexLock(&pr->readerMutex); taosThreadMutexLock(&pr->readerMutex);
code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap); code = tsdbTakeReadSnap2((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _end; goto _end;
} }
...@@ -427,8 +425,12 @@ _end: ...@@ -427,8 +425,12 @@ _end:
tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReaderLast);
tsdbDataFReaderClose(&pr->pDataFReader); tsdbDataFReaderClose(&pr->pDataFReader);
resetLastBlockLoadInfo(pr->pLoadInfo); int64_t loadBlocks = 0;
tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true); double elapse = 0;
pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, &loadBlocks, &elapse);
pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
taosThreadMutexUnlock(&pr->readerMutex); taosThreadMutexUnlock(&pr->readerMutex);
if (pRes != NULL) { if (pRes != NULL) {
......
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
/* Exposed Handle */ /* Exposed Handle */
typedef struct STFileSystem STFileSystem; typedef struct STFileSystem STFileSystem;
typedef struct STFSBgTask STFSBgTask; typedef struct STFSBgTask STFSBgTask;
typedef TARRAY2(STFileSet *) TFileSetArray; // typedef TARRAY2(STFileSet *) TFileSetArray;
typedef enum { typedef enum {
TSDB_FEDIT_COMMIT = 1, // TSDB_FEDIT_COMMIT = 1, //
...@@ -107,4 +107,4 @@ struct STFileSystem { ...@@ -107,4 +107,4 @@ struct STFileSystem {
} }
#endif #endif
#endif /*_TSDB_FILE_SYSTEM_H*/ #endif /*_TSDB_FILE_SYSTEM_H*/
\ No newline at end of file
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
#include "tsdbDataFileRW.h" #include "tsdbDataFileRW.h"
#include "tsdbFS2.h" #include "tsdbFS2.h"
#include "tsdbMerge.h" #include "tsdbMerge.h"
#include "tsdbReadUtil.h"
#include "tsdbUtil2.h" #include "tsdbUtil2.h"
#include "tsimplehash.h" #include "tsimplehash.h"
#include "tsdbReadUtil.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey) #define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
...@@ -100,7 +100,7 @@ static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInf ...@@ -100,7 +100,7 @@ static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInf
i += 1; i += 1;
j += 1; j += 1;
} else if (pTCol->colId < pSupInfo->colId[j]) { // do nothing } else if (pTCol->colId < pSupInfo->colId[j]) { // do nothing
i += 1; i += 1;
} else { } else {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
...@@ -3815,7 +3815,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { ...@@ -3815,7 +3815,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
initFilesetIterator(&pStatus->fileIter, pReader->status.pfSetArray, pReader); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order); resetDataBlockIterator(&pStatus->blockIter, pReader->info.order);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -3842,7 +3842,6 @@ static void freeSchemaFunc(void* param) { ...@@ -3842,7 +3842,6 @@ static void freeSchemaFunc(void* param) {
static void clearSharedPtr(STsdbReader* p) { static void clearSharedPtr(STsdbReader* p) {
p->status.pTableMap = NULL; p->status.pTableMap = NULL;
p->status.uidList.tableUidList = NULL; p->status.uidList.tableUidList = NULL;
p->status.pfSetArray = NULL;
p->info.pSchema = NULL; p->info.pSchema = NULL;
p->pReadSnap = NULL; p->pReadSnap = NULL;
p->pSchemaMap = NULL; p->pSchemaMap = NULL;
...@@ -3851,7 +3850,8 @@ static void clearSharedPtr(STsdbReader* p) { ...@@ -3851,7 +3850,8 @@ static void clearSharedPtr(STsdbReader* p) {
static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
pDst->status.pTableMap = pSrc->status.pTableMap; pDst->status.pTableMap = pSrc->status.pTableMap;
pDst->status.uidList = pSrc->status.uidList; pDst->status.uidList = pSrc->status.uidList;
pDst->status.pfSetArray = pSrc->status.pfSetArray; // pDst->status.pfSetArray = pSrc->status.pfSetArray;
pDst->pReadSnap->pfSetArray = pSrc->pReadSnap->pfSetArray;
pDst->info.pSchema = pSrc->info.pSchema; pDst->info.pSchema = pSrc->info.pSchema;
pDst->pSchemaMap = pSrc->pSchemaMap; pDst->pSchemaMap = pSrc->pSchemaMap;
pDst->pReadSnap = pSrc->pReadSnap; pDst->pReadSnap = pSrc->pReadSnap;
...@@ -4633,7 +4633,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { ...@@ -4633,7 +4633,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
initFilesetIterator(&pStatus->fileIter, pReader->status.pfSetArray, pReader); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
resetDataBlockIterator(pBlockIter, pReader->info.order); resetDataBlockIterator(pBlockIter, pReader->info.order);
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
...@@ -4886,7 +4886,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs ...@@ -4886,7 +4886,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
} }
// fs // fs
code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pReader->status.pfSetArray); code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pSnap->pfSetArray);
if (code) { if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _exit; goto _exit;
...@@ -4929,7 +4929,7 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact ...@@ -4929,7 +4929,7 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
if (pSnap->pINode) taosMemoryFree(pSnap->pINode); if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
taosMemoryFree(pSnap); taosMemoryFree(pSnap);
tsdbFSDestroyRefSnapshot(&pReader->status.pfSetArray); tsdbFSDestroyRefSnapshot(&pSnap->pfSetArray);
} }
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
} }
......
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#include "tsdbUtil2.h"
#include "tsdbDataFileRW.h" #include "tsdbDataFileRW.h"
#include "tsdbUtil2.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
...@@ -196,7 +196,6 @@ typedef struct SReaderStatus { ...@@ -196,7 +196,6 @@ typedef struct SReaderStatus {
SArray* pLDataIterArray; SArray* pLDataIterArray;
SRowMerger merger; SRowMerger merger;
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
TFileSetArray* pfSetArray;
} SReaderStatus; } SReaderStatus;
struct STsdbReader { struct STsdbReader {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册