提交 4b47476d 编写于 作者: M Minglei Jin

fix: upgrade last files reading for last/last_row caching

上级 c6106be5
...@@ -123,6 +123,7 @@ int32_t tGetBlockL(uint8_t *p, void *ph); ...@@ -123,6 +123,7 @@ int32_t tGetBlockL(uint8_t *p, void *ph);
int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph);
int32_t tCmprBlockIdx(void const *lhs, void const *rhs); int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
int32_t tCmprBlockL(void const *lhs, void const *rhs);
// SColdata // SColdata
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn); void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn);
void tColDataReset(SColData *pColData); void tColDataReset(SColData *pColData);
......
...@@ -279,6 +279,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb ...@@ -279,6 +279,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
} }
} }
_invalidate:
taosMemoryFreeClear(pTSchema); taosMemoryFreeClear(pTSchema);
taosLRUCacheRelease(pCache, h, invalidate); taosLRUCacheRelease(pCache, h, invalidate);
...@@ -404,6 +405,178 @@ _err: ...@@ -404,6 +405,178 @@ _err:
return code; return code;
} }
typedef enum {
SFSLASTNEXTROW_FS,
SFSLASTNEXTROW_FILESET,
SFSLASTNEXTROW_BLOCKDATA,
SFSLASTNEXTROW_BLOCKROW
} SFSLASTNEXTROWSTATES;
typedef struct {
SFSLASTNEXTROWSTATES state; // [input]
STsdb *pTsdb; // [input]
SBlockIdx *pBlockIdxExp; // [input]
STSchema *pTSchema; // [input]
int32_t nFileSet;
int32_t iFileSet;
SArray *aDFileSet;
SDataFReader *pDataFReader;
SArray *aBlockL;
SBlockL *pBlockL;
SBlockData *pBlockDataL;
SBlockData blockDataL;
int32_t nRow;
int32_t iRow;
TSDBROW row;
/*
SArray *aBlockIdx;
SBlockIdx *pBlockIdx;
SMapData blockMap;
int32_t nBlock;
int32_t iBlock;
SBlock block;
*/
} SFSLastNextRowIter;
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0;
switch (state->state) {
case SFSLASTNEXTROW_FS:
// state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet;
state->nFileSet = taosArrayGetSize(state->aDFileSet);
state->iFileSet = state->nFileSet;
state->pBlockDataL = NULL;
case SFSLASTNEXTROW_FILESET: {
SDFileSet *pFileSet = NULL;
_next_fileset:
if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else {
if (state->pBlockDataL) {
tBlockDataDestroy(state->pBlockDataL, 1);
state->pBlockDataL = NULL;
}
*ppRow = NULL;
return code;
}
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
if (code) goto _err;
if (!state->aBlockL) {
state->aBlockL = taosArrayInit(0, sizeof(SBlockIdx));
} else {
taosArrayClear(state->aBlockL);
}
code = tsdbReadBlockL(state->pDataFReader, state->aBlockL);
if (code) goto _err;
// SBlockL *pBlockL = (SBlockL *)taosArrayGet(state->aBlockL, state->iBlockL);
state->pBlockL = taosArraySearch(state->aBlockL, state->pBlockIdxExp, tCmprBlockL, TD_EQ);
if (!state->pBlockL) {
goto _next_fileset;
}
int64_t suid = state->pBlockL->suid;
int64_t uid = state->pBlockL->maxUid;
if (!state->pBlockDataL) {
state->pBlockDataL = &state->blockDataL;
}
code = tBlockDataInit(state->pBlockDataL, suid, suid ? 0 : uid, state->pTSchema);
if (code) goto _err;
}
case SFSLASTNEXTROW_BLOCKDATA:
code = tsdbReadLastBlock(state->pDataFReader, state->pBlockL, state->pBlockDataL);
if (code) goto _err;
state->nRow = state->blockDataL.nRow;
state->iRow = state->nRow - 1;
if (!state->pBlockDataL->uid) {
while (state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) {
--state->iRow;
}
}
state->state = SFSLASTNEXTROW_BLOCKROW;
case SFSLASTNEXTROW_BLOCKROW:
if (state->pBlockDataL->uid) {
if (state->iRow >= 0) {
state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow);
*ppRow = &state->row;
if (--state->iRow < 0) {
state->state = SFSLASTNEXTROW_FILESET;
}
}
} else {
if (state->iRow >= 0 && state->pBlockIdxExp->uid == state->pBlockDataL->aUid[state->iRow]) {
state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow);
*ppRow = &state->row;
if (--state->iRow < 0 || state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) {
state->state = SFSLASTNEXTROW_FILESET;
}
}
}
return code;
default:
ASSERT(0);
break;
}
_err:
if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
if (state->aBlockL) {
taosArrayDestroy(state->aBlockL);
state->aBlockL = NULL;
}
if (state->pBlockDataL) {
tBlockDataDestroy(state->pBlockDataL, 1);
state->pBlockDataL = NULL;
}
*ppRow = NULL;
return code;
}
int32_t clearNextRowFromFSLast(void *iter) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0;
if (!state) {
return code;
}
if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
if (state->aBlockL) {
taosArrayDestroy(state->aBlockL);
state->aBlockL = NULL;
}
if (state->pBlockDataL) {
tBlockDataDestroy(state->pBlockDataL, 1);
state->pBlockDataL = NULL;
}
return code;
}
typedef enum SFSNEXTROWSTATES { typedef enum SFSNEXTROWSTATES {
SFSNEXTROW_FS, SFSNEXTROW_FS,
SFSNEXTROW_FILESET, SFSNEXTROW_FILESET,
...@@ -722,18 +895,19 @@ typedef struct { ...@@ -722,18 +895,19 @@ typedef struct {
SArray *pSkyline; SArray *pSkyline;
int64_t iSkyline; int64_t iSkyline;
SBlockIdx idx; SBlockIdx idx;
SMemNextRowIter memState; SMemNextRowIter memState;
SMemNextRowIter imemState; SMemNextRowIter imemState;
SFSNextRowIter fsState; SFSLastNextRowIter fsLastState;
TSDBROW memRow, imemRow, fsRow; SFSNextRowIter fsState;
TSDBROW memRow, imemRow, fsLastRow, fsRow;
TsdbNextRowState input[3]; TsdbNextRowState input[4];
STsdbReadSnap *pReadSnap; STsdbReadSnap *pReadSnap;
STsdb *pTsdb; STsdb *pTsdb;
} CacheNextRowIter; } CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) { static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema) {
int code = 0; int code = 0;
tb_uid_t suid = getTableSuidByUid(uid, pTsdb); tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
...@@ -779,6 +953,12 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -779,6 +953,12 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
pIter->fsLastState.state = SFSNEXTROW_FS;
pIter->fsLastState.pTsdb = pTsdb;
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
pIter->fsLastState.pBlockIdxExp = &pIter->idx;
pIter->fsLastState.pTSchema = pTSchema;
pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb; pIter->fsState.pTsdb = pTsdb;
pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
...@@ -786,7 +966,9 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -786,7 +966,9 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
pIter->input[2] = pIter->input[2] = (TsdbNextRowState){&pIter->fsLastRow, false, true, &pIter->fsLastState, getNextRowFromFSLast,
clearNextRowFromFSLast};
pIter->input[3] =
(TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS}; (TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
if (pMem) { if (pMem) {
...@@ -811,7 +993,7 @@ _err: ...@@ -811,7 +993,7 @@ _err:
static int32_t nextRowIterClose(CacheNextRowIter *pIter) { static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
int code = 0; int code = 0;
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 4; ++i) {
if (pIter->input[i].nextRowClearFn) { if (pIter->input[i].nextRowClearFn) {
pIter->input[i].nextRowClearFn(pIter->input[i].iter); pIter->input[i].nextRowClearFn(pIter->input[i].iter);
} }
...@@ -823,7 +1005,6 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { ...@@ -823,7 +1005,6 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap); tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap);
return code;
_err: _err:
return code; return code;
} }
...@@ -832,7 +1013,7 @@ _err: ...@@ -832,7 +1013,7 @@ _err:
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
int code = 0; int code = 0;
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 4; ++i) {
if (pIter->input[i].next && !pIter->input[i].stop) { if (pIter->input[i].next && !pIter->input[i].stop) {
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow); code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
if (code) goto _err; if (code) goto _err;
...@@ -844,18 +1025,18 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { ...@@ -844,18 +1025,18 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
} }
} }
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) { if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
*ppRow = NULL; *ppRow = NULL;
return code; return code;
} }
// select maxpoint(s) from mem, imem, fs // select maxpoint(s) from mem, imem, fs and last
TSDBROW *max[3] = {0}; TSDBROW *max[4] = {0};
int iMax[3] = {-1, -1, -1}; int iMax[4] = {-1, -1, -1, -1};
int nMax = 0; int nMax = 0;
TSKEY maxKey = TSKEY_MIN; TSKEY maxKey = TSKEY_MIN;
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 4; ++i) {
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) { if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow); TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
...@@ -873,8 +1054,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { ...@@ -873,8 +1054,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
} }
// delete detection // delete detection
TSDBROW *merge[3] = {0}; TSDBROW *merge[4] = {0};
int iMerge[3] = {-1, -1, -1}; int iMerge[4] = {-1, -1, -1, -1};
int nMerge = 0; int nMerge = 0;
for (int i = 0; i < nMax; ++i) { for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey = TSDBROW_KEY(max[i]); TSDBKEY maxKey = TSDBROW_KEY(max[i]);
...@@ -915,7 +1096,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -915,7 +1096,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
TSKEY lastRowTs = TSKEY_MAX; TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb); nextRowIterOpen(&iter, uid, pTsdb, pTSchema);
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
...@@ -1012,7 +1193,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { ...@@ -1012,7 +1193,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
TSKEY lastRowTs = TSKEY_MAX; TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb); nextRowIterOpen(&iter, uid, pTsdb, pTSchema);
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
......
...@@ -196,6 +196,25 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { ...@@ -196,6 +196,25 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
return 0; return 0;
} }
int32_t tCmprBlockL(void const *lhs, void const *rhs) {
SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
SBlockL *rBlockL = (SBlockL *)rhs;
if (lBlockIdx->suid < rBlockL->suid) {
return -1;
} else if (lBlockIdx->suid > rBlockL->suid) {
return 1;
}
if (lBlockIdx->uid < rBlockL->minUid) {
return -1;
} else if (lBlockIdx->uid > rBlockL->maxUid) {
return 1;
}
return 0;
}
// SBlock ====================================================== // SBlock ======================================================
void tBlockReset(SBlock *pBlock) { void tBlockReset(SBlock *pBlock) {
*pBlock = (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; *pBlock = (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
...@@ -1944,4 +1963,4 @@ int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t ...@@ -1944,4 +1963,4 @@ int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t
_exit: _exit:
return code; return code;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册