提交 f34e43d7 编写于 作者: M Minglei Jin

tsdbCache: new clearNextRowFS to cleanup fs iter

上级 a7cbb93a
...@@ -327,15 +327,16 @@ typedef struct SFSNextRowIter { ...@@ -327,15 +327,16 @@ typedef struct SFSNextRowIter {
SArray *aBlockIdx; SArray *aBlockIdx;
// SMapData blockIdxMap; // SMapData blockIdxMap;
// SBlockIdx blockIdx; // SBlockIdx blockIdx;
SBlockIdx *pBlockIdx; SBlockIdx *pBlockIdx;
SMapData blockMap; SMapData blockMap;
int32_t nBlock; int32_t nBlock;
int32_t iBlock; int32_t iBlock;
SBlock block; SBlock block;
SBlockData blockData; SBlockData blockData;
int32_t nRow; SBlockData *pBlockData;
int32_t iRow; int32_t nRow;
TSDBROW row; int32_t iRow;
TSDBROW row;
} SFSNextRowIter; } SFSNextRowIter;
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
...@@ -346,14 +347,20 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -346,14 +347,20 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
case SFSNEXTROW_FS: case SFSNEXTROW_FS:
state->aDFileSet = state->pTsdb->fs->cState->aDFileSet; state->aDFileSet = state->pTsdb->fs->cState->aDFileSet;
state->nFileSet = taosArrayGetSize(state->aDFileSet); state->nFileSet = taosArrayGetSize(state->aDFileSet);
state->iFileSet = state->nFileSet; state->iFileSet = state->nFileSet - 1;
state->pBlockData = NULL;
case SFSNEXTROW_FILESET: { case SFSNEXTROW_FILESET: {
SDFileSet *pFileSet = NULL; SDFileSet *pFileSet = NULL;
if (--state->iFileSet >= 0) { if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else { } else {
tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData);
if (state->pBlockData) {
tBlockDataClear(state->pBlockData);
state->pBlockData = NULL;
}
*ppRow = NULL; *ppRow = NULL;
return code; return code;
...@@ -390,18 +397,23 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -390,18 +397,23 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
state->nBlock = state->blockMap.nItem; state->nBlock = state->blockMap.nItem;
state->iBlock = state->nBlock - 1; state->iBlock = state->nBlock - 1;
tBlockDataInit(&state->blockData); if (!state->pBlockData) {
state->pBlockData = &state->blockData;
tBlockDataInit(&state->blockData);
}
} }
case SFSNEXTROW_BLOCKDATA: case SFSNEXTROW_BLOCKDATA:
if (state->iBlock >= 0) { if (state->iBlock >= 0) {
SBlock block = {0}; SBlock block = {0};
tBlockReset(&block); tBlockReset(&block);
tBlockDataReset(&state->blockData); // tBlockDataReset(&state->blockData);
tBlockDataReset(state->pBlockData);
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock); tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock);
/* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */ /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */
code = tsdbReadBlockData(state->pDataFReader, state->pBlockIdx, &block, &state->blockData, NULL, NULL); code = tsdbReadBlockData(state->pDataFReader, state->pBlockIdx, &block, state->pBlockData, NULL, NULL);
if (code) goto _err; if (code) goto _err;
state->nRow = state->blockData.nRow; state->nRow = state->blockData.nRow;
...@@ -411,15 +423,18 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -411,15 +423,18 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
} }
case SFSNEXTROW_BLOCKROW: case SFSNEXTROW_BLOCKROW:
if (state->iRow >= 0) { if (state->iRow >= 0) {
state->row = tsdbRowFromBlockData(&state->blockData, state->iRow); state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
*ppRow = &state->row; *ppRow = &state->row;
if (--state->iRow < 0) { if (--state->iRow < 0) {
state->state = SFSNEXTROW_BLOCKDATA; state->state = SFSNEXTROW_BLOCKDATA;
if (--state->iBlock < 0) { if (--state->iBlock < 0) {
tsdbDataFReaderClose(&state->pDataFReader); tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
if (state->aBlockIdx) { if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx); taosArrayDestroy(state->aBlockIdx);
state->aBlockIdx = NULL;
} }
state->state = SFSNEXTROW_FILESET; state->state = SFSNEXTROW_FILESET;
...@@ -436,17 +451,48 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -436,17 +451,48 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
_err: _err:
if (state->pDataFReader) { if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader); tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
} }
if (state->aBlockIdx) { if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx); taosArrayDestroy(state->aBlockIdx);
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData);
tBlockDataClear(state->pBlockData);
state->pBlockData = NULL;
} }
tBlockDataClear(&state->blockData);
*ppRow = NULL; *ppRow = NULL;
return code; return code;
} }
int32_t clearNextRowFromFS(void *iter) {
int32_t code = 0;
SFSNextRowIter *state = (SFSNextRowIter *)iter;
if (!state) {
return code;
}
if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx);
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData);
tBlockDataClear(state->pBlockData);
state->pBlockData = NULL;
}
return code;
}
typedef enum SMEMNEXTROWSTATES { typedef enum SMEMNEXTROWSTATES {
SMEMNEXTROW_ENTER, SMEMNEXTROW_ENTER,
SMEMNEXTROW_NEXT, SMEMNEXTROW_NEXT,
...@@ -566,17 +612,20 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) { ...@@ -566,17 +612,20 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) {
} }
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow); typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
typedef struct TsdbNextRowState { typedef struct TsdbNextRowState {
TSDBROW *pRow; TSDBROW *pRow;
bool stop; bool stop;
bool next; bool next;
void *iter; void *iter;
_next_row_fn_t nextRowFn; _next_row_fn_t nextRowFn;
_next_row_clear_fn_t nextRowClearFn;
} TsdbNextRowState; } TsdbNextRowState;
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) { static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
int32_t code = 0; int32_t code = 0;
SArray *pSkyline = NULL;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
int16_t nCol = pTSchema->numOfCols; int16_t nCol = pTSchema->numOfCols;
...@@ -596,7 +645,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -596,7 +645,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
*ppRow = NULL; *ppRow = NULL;
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
SDelIdx delIdx; SDelIdx delIdx;
...@@ -632,9 +681,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -632,9 +681,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
SMemNextRowIter imemState = {0}; SMemNextRowIter imemState = {0};
TSDBROW memRow, imemRow, fsRow; TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem}, TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL},
{&imemRow, true, false, &imemState, getNextRowFromMem}, {&imemRow, true, false, &imemState, getNextRowFromMem, NULL},
{&fsRow, false, true, &fsState, getNextRowFromFS}}; {&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}};
if (pMem) { if (pMem) {
memState.pMem = pMem; memState.pMem = pMem;
...@@ -732,10 +781,21 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo ...@@ -732,10 +781,21 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
} while (*ppRow == NULL); } while (*ppRow == NULL);
for (int i = 0; i < 3; ++i) {
if (input[i].nextRowClearFn) {
input[i].nextRowClearFn(input[i].iter);
}
}
if (pSkyline) {
taosArrayDestroy(pSkyline);
}
taosMemoryFreeClear(pTSchema); taosMemoryFreeClear(pTSchema);
return code; return code;
_err: _err:
if (pSkyline) {
taosArrayDestroy(pSkyline);
}
taosMemoryFreeClear(pTSchema); taosMemoryFreeClear(pTSchema);
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
...@@ -798,9 +858,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { ...@@ -798,9 +858,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
SMemNextRowIter imemState = {0}; SMemNextRowIter imemState = {0};
TSDBROW memRow, imemRow, fsRow; TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem}, TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL},
{&imemRow, true, false, &imemState, getNextRowFromMem}, {&imemRow, true, false, &imemState, getNextRowFromMem, NULL},
{&fsRow, false, true, &fsState, getNextRowFromFS}}; {&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}};
if (pMem) { if (pMem) {
memState.pMem = pMem; memState.pMem = pMem;
...@@ -1008,7 +1068,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH ...@@ -1008,7 +1068,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
code = mergeLastRow(uid, pTsdb, &dup, &pRow); code = mergeLastRow(uid, pTsdb, &dup, &pRow);
// if table's empty or error, return code of -1 // if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) { if (code < 0 || pRow == NULL) {
if (!dup) { if (!dup && pRow) {
taosMemoryFree(pRow); taosMemoryFree(pRow);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册