提交 524ef6ed 编写于 作者: H Haojun Liao

fix(tsdb): read data from new storage format.

上级 d97f3493
......@@ -16,6 +16,9 @@
#ifndef _TD_VNODE_TSDB_H_
#define _TD_VNODE_TSDB_H_
//#include "../tsdb/tsdbFile2.h"
//#include "../tsdb/tsdbMerge.h"
//#include "../tsdb/tsdbSttFileRW.h"
#include "tsimplehash.h"
#include "vnodeInt.h"
......@@ -303,6 +306,9 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
// tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(void *arg);
......@@ -697,6 +703,8 @@ typedef struct {
typedef struct SSttBlockLoadInfo {
SBlockData blockData[2];
void* pBlockArray;
SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex;
......@@ -769,7 +777,6 @@ struct SDiskDataBuilder {
typedef struct SLDataIter {
SRBTreeNode node;
SSttBlk *pSttBlk;
SDataFReader *pReader;
int32_t iStt;
int8_t backward;
int32_t iSttBlk;
......@@ -780,13 +787,20 @@ typedef struct SLDataIter {
SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo;
bool ignoreEarlierTs;
struct SSttFileReader* pReader;
} SLDataIter;
#define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row))
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter, void* pCurrentFileSet);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
......
......@@ -680,7 +680,7 @@ int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
int32_t code = 0;
STFileSet *fset, *fset1;
fsetArr[0] = taosMemoryMalloc(sizeof(*fsetArr[0]));
fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
if (fsetArr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
......
......@@ -15,8 +15,8 @@
#include "tsdbFile2.h"
#ifndef _TSDB_FILE_SET_H
#define _TSDB_FILE_SET_H
#ifndef _TSDB_FILE_SET2_H
#define _TSDB_FILE_SET2_H
#ifdef __cplusplus
extern "C" {
......@@ -82,4 +82,4 @@ struct STFileSet {
}
#endif
#endif /*_TSDB_FILE_SET_H*/
\ No newline at end of file
#endif /*_TSDB_FILE_SET2_H*/
\ No newline at end of file
......@@ -14,6 +14,8 @@
*/
#include "tsdb.h"
#include "tsdbFSet2.h"
#include "tsdbSttFileRW.h"
// SLDataIter =================================================
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
......@@ -135,7 +137,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
goto _exit;
}
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
code = tsdbSttFileReadBlockData(pIter->pReader, pIter->pSttBlk, pBlock);
// code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
......@@ -253,18 +256,24 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
}
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr, bool strictTimeRange) {
return 0;
}
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr, bool strictTimeRange) {
int32_t code = TSDB_CODE_SUCCESS;
pIter->uid = uid;
pIter->pReader = pReader;
pIter->iStt = iStt;
pIter->backward = backward;
pIter->verRange.minVer = pRange->minVer;
pIter->verRange.maxVer = pRange->maxVer;
pIter->timeWindow.skey = pTimeWindow->skey;
pIter->timeWindow.ekey = pTimeWindow->ekey;
pIter->pReader = pReader;
pIter->pBlockLoadInfo = pBlockLoadInfo;
......@@ -272,17 +281,18 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t
int64_t st = taosGetTimestampUs();
pBlockLoadInfo->sttBlockLoaded = true;
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray);
if (code) {
return code;
}
// only apply to the child tables, ordinary tables will not incur this filter procedure.
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray;
size_t size = pArray->size;
if (size >= 1) {
SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
SSttBlk *pStart = &pArray->data[0];
SSttBlk *pEnd = &pArray->data[size - 1];
// all identical
if (pStart->suid == pEnd->suid) {
......@@ -320,12 +330,12 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
}
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray;
// find the start block
pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
pIter->iSttBlk = binarySearchForStartBlock(pArray->data, pArray->size, uid, backward);
if (pIter->iSttBlk != -1) {
pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
pIter->pSttBlk = &pArray->data[pIter->iSttBlk];
pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;
if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
......@@ -403,17 +413,15 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
}
static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
int32_t step = pIter->backward ? -1 : 1;
bool hasVal = false;
int32_t step = pIter->backward ? -1 : 1;
int32_t i = pIter->iRow;
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
SBlockData *pData = loadLastBlock(pIter, idStr);
// mostly we only need to find the start position for a given table
if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) &&
pBlockData->aUid != NULL) {
i = binarySearchForStartRowIndex((uint64_t *)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
i = binarySearchForStartRowIndex((uint64_t *)pData->aUid, pData->nRow, pIter->uid, pIter->backward);
if (i == -1) {
tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
pIter->iRow = -1;
......@@ -421,20 +429,20 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
}
}
for (; i < pBlockData->nRow && i >= 0; i += step) {
if (pBlockData->aUid != NULL) {
for (; i < pData->nRow && i >= 0; i += step) {
if (pData->aUid != NULL) {
if (!pIter->backward) {
if (pBlockData->aUid[i] > pIter->uid) {
if (pData->aUid[i] > pIter->uid) {
break;
}
} else {
if (pBlockData->aUid[i] < pIter->uid) {
if (pData->aUid[i] < pIter->uid) {
break;
}
}
}
int64_t ts = pBlockData->aTSKEY[i];
int64_t ts = pData->aTSKEY[i];
if (!pIter->backward) { // asc
if (ts > pIter->timeWindow.ekey) { // no more data
break;
......@@ -449,7 +457,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
}
}
int64_t ver = pBlockData->aVersion[i];
int64_t ver = pData->aVersion[i];
if (ver < pIter->verRange.minVer) {
continue;
}
......@@ -485,7 +493,6 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
while (1) {
bool skipBlock = false;
findNextValidRow(pIter, idStr);
if (pIter->pBlockLoadInfo->checkRemainingRow) {
......@@ -612,6 +619,62 @@ _end:
return code;
}
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter, void* pCurrentFileSet) {
int32_t code = TSDB_CODE_SUCCESS;
pMTree->backward = backward;
pMTree->pIter = NULL;
pMTree->idStr = idStr;
if (!pMTree->backward) { // asc
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
} else { // desc
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
}
pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo;
pMTree->ignoreEarlierTs = false;
// todo handle other level of stt files, here only deal with the first level stt
SSttLvl* pSttLevel = ((STFileSet*)pCurrentFileSet)->lvlArr[0].data[0];
ASSERT(pSttLevel->level == 0);
for (int32_t i = 0; i < pSttLevel->fobjArr[0].size; ++i) { // open all last file
memset(&pLDataIter[i], 0, sizeof(SLDataIter));
SSttFileReader* pReader = NULL;
SSttFileReaderConfig conf = {0};
conf.tsdb = pTsdb;
conf.szPage = pTsdb->pVnode->config.szPage;
conf.file[0] = *pSttLevel->fobjArr[0].data[i]->f;
code = tsdbSttFileReaderOpen(pSttLevel->fobjArr[0].data[i]->fname, &conf, &pReader);
code = tLDataIterOpen2(&pLDataIter[i], pReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
if (hasVal) {
tMergeTreeAddIter(pMTree, &pLDataIter[i]);
} else {
if (!pMTree->ignoreEarlierTs) {
pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
}
}
}
return code;
_end:
tMergeTreeClose(pMTree);
return code;
}
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
......
......@@ -42,15 +42,15 @@ void initStorageAPI(SStorageAPI* pAPI) {
void initTsdbReaderAPI(TsdReader* pReader) {
pReader->tsdReaderOpen = (int32_t(*)(void*, SQueryTableDataCond*, void*, int32_t, SSDataBlock*, void**, const char*,
bool, SHashObj**))tsdbReaderOpen;
pReader->tsdReaderClose = tsdbReaderClose;
bool, SHashObj**))tsdbReaderOpen2;
pReader->tsdReaderClose = tsdbReaderClose2;
pReader->tsdNextDataBlock = tsdbNextDataBlock;
pReader->tsdNextDataBlock = tsdbNextDataBlock2;
pReader->tsdReaderRetrieveDataBlock = tsdbRetrieveDataBlock;
pReader->tsdReaderReleaseDataBlock = tsdbReleaseDataBlock;
pReader->tsdReaderRetrieveDataBlock = tsdbRetrieveDataBlock2;
pReader->tsdReaderReleaseDataBlock = tsdbReleaseDataBlock2;
pReader->tsdReaderRetrieveBlockSMAInfo = tsdbRetrieveDatablockSMA;
pReader->tsdReaderRetrieveBlockSMAInfo = tsdbRetrieveDatablockSMA2;
pReader->tsdReaderNotifyClosing = tsdbReaderSetCloseFlag;
pReader->tsdReaderResetStatus = tsdbReaderReset;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册