提交 a6cece10 编写于 作者: H Haojun Liao

fix(query): fix the invalid the check of last file blocks.

上级 e66583be
...@@ -633,7 +633,7 @@ typedef struct SMergeTree { ...@@ -633,7 +633,7 @@ typedef struct SMergeTree {
struct SLDataIter *pIter; struct SLDataIter *pIter;
} SMergeTree; } SMergeTree;
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange); int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t suid, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange);
void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
......
...@@ -420,6 +420,7 @@ typedef enum { ...@@ -420,6 +420,7 @@ typedef enum {
typedef struct { typedef struct {
SFSLASTNEXTROWSTATES state; // [input] SFSLASTNEXTROWSTATES state; // [input]
STsdb *pTsdb; // [input] STsdb *pTsdb; // [input]
tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
int32_t nFileSet; int32_t nFileSet;
int32_t iFileSet; int32_t iFileSet;
...@@ -454,7 +455,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { ...@@ -454,7 +455,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
if (code) goto _err; if (code) goto _err;
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->uid, tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}); &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX});
bool hasVal = tMergeTreeNext(&state->mergeTree); bool hasVal = tMergeTreeNext(&state->mergeTree);
...@@ -796,7 +797,7 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { ...@@ -796,7 +797,7 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
if (key->ts > pItemBack->ts) { if (key->ts > pItemBack->ts) {
return false; return false;
} else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) { } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
if ((key->version <= pItemFront->version || key->ts == pItemBack->ts && key->version <= pItemBack->version)) { if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
return true; return true;
} else { } else {
return false; return false;
...@@ -890,6 +891,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -890,6 +891,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS; pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
pIter->fsLastState.pTsdb = pTsdb; pIter->fsLastState.pTsdb = pTsdb;
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
pIter->fsLastState.suid = suid;
pIter->fsLastState.uid = uid; pIter->fsLastState.uid = uid;
pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.state = SFSNEXTROW_FS;
......
...@@ -42,8 +42,8 @@ static SBlockData* getNextBlock(SLDataIter* pIter) { ...@@ -42,8 +42,8 @@ static SBlockData* getNextBlock(SLDataIter* pIter) {
return getCurrentBlock(pIter); return getCurrentBlock(pIter);
} }
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t uid, int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t suid,
STimeWindow *pTimeWindow, SVersionRange *pRange) { uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) {
int32_t code = 0; int32_t code = 0;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
if (*pIter == NULL) { if (*pIter == NULL) {
...@@ -52,11 +52,11 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -52,11 +52,11 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
} }
(*pIter)->uid = uid; (*pIter)->uid = uid;
(*pIter)->timeWindow = *pTimeWindow;
(*pIter)->verRange = *pRange;
(*pIter)->pReader = pReader;
(*pIter)->iSst = iSst; (*pIter)->iSst = iSst;
(*pIter)->pReader = pReader;
(*pIter)->verRange = *pRange;
(*pIter)->backward = backward; (*pIter)->backward = backward;
(*pIter)->timeWindow = *pTimeWindow;
(*pIter)->aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); (*pIter)->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
if ((*pIter)->aSstBlk == NULL) { if ((*pIter)->aSstBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -85,6 +85,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -85,6 +85,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
if (!backward) { // asc if (!backward) { // asc
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i); SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
if (p->suid != suid) {
continue;
}
if (p->minUid <= uid && p->maxUid >= uid) { if (p->minUid <= uid && p->maxUid >= uid) {
index = i; index = i;
break; break;
...@@ -93,6 +97,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -93,6 +97,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
} else { // desc } else { // desc
for (int32_t i = size - 1; i >= 0; --i) { for (int32_t i = size - 1; i >= 0; --i) {
SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i); SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
if (p->suid != suid) {
continue;
}
if (p->minUid <= uid && p->maxUid >= uid) { if (p->minUid <= uid && p->maxUid >= uid) {
index = i; index = i;
break; break;
...@@ -134,9 +142,31 @@ void tLDataIterNextBlock(SLDataIter *pIter) { ...@@ -134,9 +142,31 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
break; break;
} }
// check uid firstly
if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) { if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
index = i; if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
break; break;
}
if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
break;
}
// check time range secondly
if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
break;
}
if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
break;
}
if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
index = i;
break;
}
}
} }
} }
...@@ -196,14 +226,6 @@ static void findNextValidRow(SLDataIter *pIter) { ...@@ -196,14 +226,6 @@ static void findNextValidRow(SLDataIter *pIter) {
continue; continue;
} }
// todo handle delete soon
#if 0
TSDBKEY k = {.ts = ts, .version = ver};
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
continue;
}
#endif
hasVal = true; hasVal = true;
break; break;
} }
...@@ -294,7 +316,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { ...@@ -294,7 +316,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
} }
} }
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange) { STimeWindow *pTimeWindow, SVersionRange *pVerRange) {
pMTree->backward = backward; pMTree->backward = backward;
pMTree->pIter = NULL; pMTree->pIter = NULL;
...@@ -308,7 +330,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead ...@@ -308,7 +330,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
struct SLDataIter *pIterList[TSDB_DEFAULT_LAST_FILE] = {0}; struct SLDataIter *pIterList[TSDB_DEFAULT_LAST_FILE] = {0};
for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange); code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _end; goto _end;
} }
......
...@@ -1934,7 +1934,7 @@ static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockS ...@@ -1934,7 +1934,7 @@ static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockS
pLastBlockReader->uid = pBlockScanInfo->uid; pLastBlockReader->uid = pBlockScanInfo->uid;
int32_t code = int32_t code =
tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
pBlockScanInfo->uid, &pLastBlockReader->window, &pLastBlockReader->verRange); pReader->suid, pBlockScanInfo->uid, &pLastBlockReader->window, &pLastBlockReader->verRange);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
...@@ -2537,22 +2537,24 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2537,22 +2537,24 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
bool hasNext = blockIteratorNext(&pReader->status.blockIter); bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (hasNext) { // check for the next block in the block accessed order list if (hasNext) { // check for the next block in the block accessed order list
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else if (hasDataInLastBlock(pReader->status.fileIter.pLastBlockReader)) {
// data blocks in current file are exhausted, let's try the next file now
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order);
goto _begin;
} else { } else {
code = initForFirstBlockInFile(pReader, pBlockIter); if (pReader->status.pCurrentFileset->nSstF > 0) {
// data blocks in current file are exhausted, let's try the next file now
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order);
goto _begin;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked // error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code; return code;
} }
// this file does not have blocks, let's start check the last block file // this file does not have blocks, let's start check the last block file
if (pBlockIter->numOfBlocks == 0) { if (pBlockIter->numOfBlocks == 0) {
goto _begin; goto _begin;
}
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册