提交 169a6664 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

......@@ -333,6 +333,7 @@ typedef struct SFSNextRowIter {
int32_t iBlock;
SBlock block;
SBlockData blockData;
SBlockData *pBlockData;
int32_t nRow;
int32_t iRow;
TSDBROW row;
......@@ -346,14 +347,20 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
case SFSNEXTROW_FS:
state->aDFileSet = state->pTsdb->fs->cState->aDFileSet;
state->nFileSet = taosArrayGetSize(state->aDFileSet);
state->iFileSet = state->nFileSet;
state->iFileSet = state->nFileSet - 1;
state->pBlockData = NULL;
case SFSNEXTROW_FILESET: {
SDFileSet *pFileSet = NULL;
if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else {
tBlockDataClear(&state->blockData);
// tBlockDataClear(&state->blockData);
if (state->pBlockData) {
tBlockDataClear(state->pBlockData);
state->pBlockData = NULL;
}
*ppRow = NULL;
return code;
......@@ -390,18 +397,23 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
state->nBlock = state->blockMap.nItem;
state->iBlock = state->nBlock - 1;
if (!state->pBlockData) {
state->pBlockData = &state->blockData;
tBlockDataInit(&state->blockData);
}
}
case SFSNEXTROW_BLOCKDATA:
if (state->iBlock >= 0) {
SBlock block = {0};
tBlockReset(&block);
tBlockDataReset(&state->blockData);
// tBlockDataReset(&state->blockData);
tBlockDataReset(state->pBlockData);
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock);
/* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */
code = tsdbReadBlockData(state->pDataFReader, state->pBlockIdx, &block, &state->blockData, NULL, NULL);
code = tsdbReadBlockData(state->pDataFReader, state->pBlockIdx, &block, state->pBlockData, NULL, NULL);
if (code) goto _err;
state->nRow = state->blockData.nRow;
......@@ -411,15 +423,18 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
}
case SFSNEXTROW_BLOCKROW:
if (state->iRow >= 0) {
state->row = tsdbRowFromBlockData(&state->blockData, state->iRow);
state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
*ppRow = &state->row;
if (--state->iRow < 0) {
state->state = SFSNEXTROW_BLOCKDATA;
if (--state->iBlock < 0) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx);
state->aBlockIdx = NULL;
}
state->state = SFSNEXTROW_FILESET;
......@@ -436,17 +451,48 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
_err:
if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx);
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData);
tBlockDataClear(state->pBlockData);
state->pBlockData = NULL;
}
tBlockDataClear(&state->blockData);
*ppRow = NULL;
return code;
}
int32_t clearNextRowFromFS(void *iter) {
int32_t code = 0;
SFSNextRowIter *state = (SFSNextRowIter *)iter;
if (!state) {
return code;
}
if (state->pDataFReader) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx);
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData);
tBlockDataClear(state->pBlockData);
state->pBlockData = NULL;
}
return code;
}
typedef enum SMEMNEXTROWSTATES {
SMEMNEXTROW_ENTER,
SMEMNEXTROW_NEXT,
......@@ -566,6 +612,7 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) {
}
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow);
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
typedef struct TsdbNextRowState {
TSDBROW *pRow;
......@@ -573,10 +620,12 @@ typedef struct TsdbNextRowState {
bool next;
void *iter;
_next_row_fn_t nextRowFn;
_next_row_clear_fn_t nextRowClearFn;
} TsdbNextRowState;
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
int32_t code = 0;
SArray *pSkyline = NULL;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
int16_t nCol = pTSchema->numOfCols;
......@@ -596,7 +645,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
*ppRow = NULL;
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
SDelIdx delIdx;
......@@ -632,9 +681,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
SMemNextRowIter imemState = {0};
TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem},
{&imemRow, true, false, &imemState, getNextRowFromMem},
{&fsRow, false, true, &fsState, getNextRowFromFS}};
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL},
{&imemRow, true, false, &imemState, getNextRowFromMem, NULL},
{&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}};
if (pMem) {
memState.pMem = pMem;
......@@ -732,10 +781,21 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
} while (*ppRow == NULL);
for (int i = 0; i < 3; ++i) {
if (input[i].nextRowClearFn) {
input[i].nextRowClearFn(input[i].iter);
}
}
if (pSkyline) {
taosArrayDestroy(pSkyline);
}
taosMemoryFreeClear(pTSchema);
return code;
_err:
if (pSkyline) {
taosArrayDestroy(pSkyline);
}
taosMemoryFreeClear(pTSchema);
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
......@@ -798,9 +858,9 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
SMemNextRowIter imemState = {0};
TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem},
{&imemRow, true, false, &imemState, getNextRowFromMem},
{&fsRow, false, true, &fsState, getNextRowFromFS}};
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem, NULL},
{&imemRow, true, false, &imemState, getNextRowFromMem, NULL},
{&fsRow, false, true, &fsState, getNextRowFromFS, clearNextRowFromFS}};
if (pMem) {
memState.pMem = pMem;
......@@ -1008,7 +1068,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
if (!dup) {
if (!dup && pRow) {
taosMemoryFree(pRow);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册