提交 13491fab 编写于 作者: M Minglei Jin

tsdb/cache: new next stt row state for fs tranverse

上级 14418fab
...@@ -45,7 +45,7 @@ typedef void (*TArray2Cb)(void *); ...@@ -45,7 +45,7 @@ typedef void (*TArray2Cb)(void *);
#define TARRAY2_GET_PTR(a, i) ((a)->data + i) #define TARRAY2_GET_PTR(a, i) ((a)->data + i)
#define TARRAY2_FIRST(a) ((a)->data[0]) #define TARRAY2_FIRST(a) ((a)->data[0])
#define TARRAY2_LAST(a) ((a)->data[(a)->size - 1]) #define TARRAY2_LAST(a) ((a)->data[(a)->size - 1])
#define TARRAY2_DATA_LEN(a) ((a)->size * sizeof(typeof((a)->data[0]))) #define TARRAY2_DATA_LEN(a) ((a)->size * sizeof(((a)->data[0])))
static FORCE_INLINE int32_t tarray2_make_room(void *arr, int32_t expSize, int32_t eleSize) { static FORCE_INLINE int32_t tarray2_make_room(void *arr, int32_t expSize, int32_t eleSize) {
TARRAY2(void) *a = arr; TARRAY2(void) *a = arr;
...@@ -140,24 +140,23 @@ static FORCE_INLINE int32_t tarray2SortInsert(void *arr, const void *elePtr, int ...@@ -140,24 +140,23 @@ static FORCE_INLINE int32_t tarray2SortInsert(void *arr, const void *elePtr, int
// return (TYPE *) // return (TYPE *)
#define TARRAY2_SEARCH(a, ep, cmp, flag) tarray2Search(a, ep, sizeof(((a)->data[0])), (__compar_fn_t)cmp, flag) #define TARRAY2_SEARCH(a, ep, cmp, flag) tarray2Search(a, ep, sizeof(((a)->data[0])), (__compar_fn_t)cmp, flag)
#define TARRAY2_SEARCH_IDX(a, ep, cmp, flag) \ #define TARRAY2_SEARCH_IDX(a, ep, cmp, flag) tarray2SearchIdx(a, ep, sizeof(((a)->data[0])), (__compar_fn_t)cmp, flag)
tarray2SearchIdx(a, ep, sizeof(typeof((a)->data[0])), (__compar_fn_t)cmp, flag)
#define TARRAY2_SORT_INSERT(a, e, cmp) tarray2SortInsert(a, &(e), sizeof(((a)->data[0])), (__compar_fn_t)cmp) #define TARRAY2_SORT_INSERT(a, e, cmp) tarray2SortInsert(a, &(e), sizeof(((a)->data[0])), (__compar_fn_t)cmp)
#define TARRAY2_SORT_INSERT_P(a, ep, cmp) tarray2SortInsert(a, ep, sizeof(((a)->data[0])), (__compar_fn_t)cmp) #define TARRAY2_SORT_INSERT_P(a, ep, cmp) tarray2SortInsert(a, ep, sizeof(((a)->data[0])), (__compar_fn_t)cmp)
#define TARRAY2_REMOVE(a, idx, cb) \ #define TARRAY2_REMOVE(a, idx, cb) \
do { \ do { \
if ((idx) < (a)->size) { \ if ((idx) < (a)->size) { \
if (cb) { \ if (cb) { \
TArray2Cb cb_ = (TArray2Cb)(cb); \ TArray2Cb cb_ = (TArray2Cb)(cb); \
cb_((a)->data + (idx)); \ cb_((a)->data + (idx)); \
} \ } \
if ((idx) < (a)->size - 1) { \ if ((idx) < (a)->size - 1) { \
memmove((a)->data + (idx), (a)->data + (idx) + 1, sizeof(typeof(*(a)->data)) * ((a)->size - (idx)-1)); \ memmove((a)->data + (idx), (a)->data + (idx) + 1, sizeof((*(a)->data)) * ((a)->size - (idx)-1)); \
} \ } \
(a)->size--; \ (a)->size--; \
} \ } \
} while (0) } while (0)
#define TARRAY2_FOREACH(a, e) for (int32_t __i = 0; __i < (a)->size && ((e) = (a)->data[__i], 1); __i++) #define TARRAY2_FOREACH(a, e) for (int32_t __i = 0; __i < (a)->size && ((e) = (a)->data[__i], 1); __i++)
......
...@@ -1884,7 +1884,8 @@ typedef enum SFSNEXTROWSTATES { ...@@ -1884,7 +1884,8 @@ typedef enum SFSNEXTROWSTATES {
SFSNEXTROW_BRINBLOCK, SFSNEXTROW_BRINBLOCK,
SFSNEXTROW_BRINRECORD, SFSNEXTROW_BRINRECORD,
SFSNEXTROW_BLOCKDATA, SFSNEXTROW_BLOCKDATA,
SFSNEXTROW_BLOCKROW SFSNEXTROW_BLOCKROW,
SFSNEXTROW_NEXTSTTROW
} SFSNEXTROWSTATES; } SFSNEXTROWSTATES;
struct CacheNextRowIter; struct CacheNextRowIter;
...@@ -1913,6 +1914,7 @@ typedef struct SFSNextRowIter { ...@@ -1913,6 +1914,7 @@ typedef struct SFSNextRowIter {
int64_t lastTs; int64_t lastTs;
SFSLastIter lastIter; SFSLastIter lastIter;
SFSLastIter *pLastIter; SFSLastIter *pLastIter;
int8_t lastEmpty;
TSDBROW *pLastRow; TSDBROW *pLastRow;
SRow *pTSRow; SRow *pTSRow;
SRowMerger rowMerger; SRowMerger rowMerger;
...@@ -1974,14 +1976,6 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -1974,14 +1976,6 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
goto _err; goto _err;
} }
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
state->pLastIter = &state->lastIter;
loadDataTomb(state->pr, state->pr->pFileReader); loadDataTomb(state->pr, state->pr->pFileReader);
if (!state->pIndexList) { if (!state->pIndexList) {
...@@ -2010,16 +2004,64 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -2010,16 +2004,64 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
int indexSize = TARRAY_SIZE(state->pIndexList); int indexSize = TARRAY_SIZE(state->pIndexList);
if (indexSize <= 0) { if (indexSize <= 0) {
// goto next fileset
clearLastFileSet(state); clearLastFileSet(state);
state->state = SFSNEXTROW_FILESET;
goto _next_fileset; goto _next_fileset;
} }
state->state = SFSNEXTROW_INDEXLIST; state->state = SFSNEXTROW_INDEXLIST;
state->iBrinIndex = indexSize; state->iBrinIndex = indexSize;
} else { // empty fileset, goto next fileset }
// clearLastFileSet(state);
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
code = lastIterNext(&state->lastIter, &state->pLastRow);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (!state->pLastRow) {
state->lastEmpty = 1;
if (SFSNEXTROW_INDEXLIST != state->state) {
clearLastFileSet(state);
goto _next_fileset;
}
} else {
state->lastEmpty = 0;
if (SFSNEXTROW_INDEXLIST != state->state) {
state->state = SFSNEXTROW_NEXTSTTROW;
*ppRow = state->pLastRow;
state->pLastRow = NULL;
return code;
}
}
state->pLastIter = &state->lastIter;
}
if (SFSNEXTROW_NEXTSTTROW == state->state) {
code = lastIterNext(&state->lastIter, &state->pLastRow);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (!state->pLastRow) {
lastIterClose(&state->pLastIter);
clearLastFileSet(state);
state->state = SFSNEXTROW_FILESET;
goto _next_fileset; goto _next_fileset;
} else {
*ppRow = state->pLastRow;
state->pLastRow = NULL;
return code;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册