From 3e55e72ab549b011e1a944f81a675c19a6ba230f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 4 Sep 2022 14:31:32 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 17 +- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 18 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 466 +++++++----------- source/dnode/vnode/src/tsdb/tsdbUtil.c | 34 -- 4 files changed, 201 insertions(+), 334 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 30d1d103c1..7fc00420b2 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -196,7 +196,6 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol uint8_t **ppBuf); int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, uint8_t **ppBuf); -int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck); // tsdbMemTable ============================================================================================== // SMemTable int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); @@ -605,10 +604,10 @@ struct SDataFWriter { STsdb *pTsdb; SDFileSet wSet; - TdFilePtr pHeadFD; - TdFilePtr pDataFD; - TdFilePtr pSmaFD; - TdFilePtr pLastFD; + STsdbFD *pHeadFD; + STsdbFD *pDataFD; + STsdbFD *pSmaFD; + STsdbFD *pLastFD; SHeadFile fHead; SDataFile fData; @@ -621,10 +620,10 @@ struct SDataFWriter { struct SDataFReader { STsdb *pTsdb; SDFileSet *pSet; - TdFilePtr pHeadFD; - TdFilePtr pDataFD; - TdFilePtr pSmaFD; - TdFilePtr aLastFD[TSDB_MAX_SST_FILE]; + STsdbFD *pHeadFD; + STsdbFD *pDataFD; + STsdbFD *pSmaFD; + STsdbFD *aLastFD[TSDB_MAX_SST_FILE]; uint8_t *aBuf[3]; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index b01d1b80d4..96901dc0ea 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -33,11 +33,9 @@ struct SLDataIter { SVersionRange verRange; }; -static SBlockData* getCurrentBlock(SLDataIter* pIter) { - return &pIter->bData[pIter->loadIndex]; -} +static SBlockData *getCurrentBlock(SLDataIter *pIter) { return &pIter->bData[pIter->loadIndex]; } -static SBlockData* getNextBlock(SLDataIter* pIter) { +static SBlockData *getNextBlock(SLDataIter *pIter) { pIter->loadIndex ^= 1; return getCurrentBlock(pIter); } @@ -150,9 +148,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) { static void findNextValidRow(SLDataIter *pIter) { int32_t step = pIter->backward ? -1 : 1; - bool hasVal = false; - int32_t i = pIter->iRow; - SBlockData* pBlockData = getCurrentBlock(pIter); + bool hasVal = false; + int32_t i = pIter->iRow; + SBlockData *pBlockData = getCurrentBlock(pIter); for (; i < pBlockData->nRow && i >= 0; i += step) { if (pBlockData->aUid != NULL) { @@ -220,8 +218,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) { return false; } - int32_t iBlockL = pIter->iSstBlk; - SBlockData* pBlockData = getCurrentBlock(pIter); + int32_t iBlockL = pIter->iSstBlk; + SBlockData *pBlockData = getCurrentBlock(pIter); if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet pBlockData = getNextBlock(pIter); @@ -306,7 +304,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); int32_t code = TSDB_CODE_OUT_OF_MEMORY; - struct SLDataIter *pIterList[TSDB_DEFAULT_LAST_FILE] = {0}; + struct SLDataIter *pIterList[TSDB_DEFAULT_SST_FILE] = {0}; for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 21c9bae868..51461cc74f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -15,9 +15,17 @@ #include "tsdb.h" // =============== PAGE-WISE FILE =============== +static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t opt, STsdbFD **ppFD) { + int32_t code = 0; + STsdbFD *pFD; -static int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) { - int32_t code = 0; + *ppFD = NULL; + + pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD)); + if (pFD == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } pFD->pFD = taosOpenFile(path, opt); if (pFD->pFD == NULL) { @@ -25,7 +33,7 @@ static int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) { goto _exit; } - pFD->szPage = 4096; + pFD->szPage = szPage; pFD->pgno = 0; pFD->nBuf = 0; pFD->pBuf = taosMemoryMalloc(pFD->szPage); @@ -33,17 +41,21 @@ static int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + *ppFD = pFD; _exit: return code; } -static void tsdbCloseFile(STsdbFD *pFD) { +static void tsdbCloseFile(STsdbFD **ppFD) { + STsdbFD *pFD = *ppFD; taosMemoryFree(pFD->pBuf); taosCloseFile(&pFD->pFD); + taosMemoryFree(pFD); + *ppFD = NULL; } -static int32_t tsdbSyncFile(STsdbFD *pFD) { +static int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t code = 0; if (taosFsyncFile(pFD->pFD) < 0) { @@ -140,11 +152,18 @@ _exit: return code; } +static int32_t tsdbLSeekFile(STsdbFD *pFD, int64_t offset) { + int32_t code = 0; + ASSERT(0); + return code; +} + // SDataFWriter ==================================================== int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { int32_t code = 0; int32_t flag; int64_t n; + int32_t szPage = 4096; SDataFWriter *pWriter = NULL; char fname[TSDB_FILENAME_LEN]; char hdr[TSDB_FHDR_SIZE] = {0}; @@ -156,14 +175,12 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS goto _err; } pWriter->pTsdb = pTsdb; - pWriter->wSet = (SDFileSet){ - .diskId = pSet->diskId, - .fid = pSet->fid, - .pHeadF = &pWriter->fHead, - .pDataF = &pWriter->fData, - .pSmaF = &pWriter->fSma, - .nSstF = pSet->nSstF // - }; + pWriter->wSet = (SDFileSet){.diskId = pSet->diskId, + .fid = pSet->fid, + .pHeadF = &pWriter->fHead, + .pDataF = &pWriter->fData, + .pSmaF = &pWriter->fSma, + .nSstF = pSet->nSstF}; pWriter->fHead = *pSet->pHeadF; pWriter->fData = *pSet->pDataF; pWriter->fSma = *pSet->pSmaF; @@ -173,19 +190,13 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS } // head - flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname); - pWriter->pHeadFD = taosOpenFile(fname, flag); - if (pWriter->pHeadFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD); + if (code) goto _err; - n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL); + if (code) goto _err; ASSERT(n == TSDB_FHDR_SIZE); @@ -193,78 +204,49 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS // data if (pWriter->fData.size == 0) { - flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; } else { - flag = TD_FILE_WRITE; + flag = TD_FILE_READ | TD_FILE_WRITE; } tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname); - pWriter->pDataFD = taosOpenFile(fname, flag); - if (pWriter->pDataFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD); + if (code) goto _err; if (pWriter->fData.size == 0) { - n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - + code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL); + if (code) goto _err; pWriter->fData.size += TSDB_FHDR_SIZE; } else { - n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_END); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - ASSERT(n == pWriter->fData.size); + // code = tsdbLSeekFile(pWriter->pDataFD, 0, SEEK_END); + // if (code) goto _err; } // sma if (pWriter->fSma.size == 0) { - flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; } else { - flag = TD_FILE_WRITE; + flag = TD_FILE_READ | TD_FILE_WRITE; } tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname); - pWriter->pSmaFD = taosOpenFile(fname, flag); - if (pWriter->pSmaFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD); + if (code) goto _err; if (pWriter->fSma.size == 0) { - n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL); + if (code) goto _err; pWriter->fSma.size += TSDB_FHDR_SIZE; } else { - n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_END); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - ASSERT(n == pWriter->fSma.size); + code = tsdbLSeekFile(pWriter->pSmaFD, 0); + if (code) goto _err; } // sst ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0); - flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname); - pWriter->pLastFD = taosOpenFile(fname, flag); - if (pWriter->pLastFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbOpenFile(fname, szPage, flag, &pWriter->pLastFD); + if (code) goto _err; + code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL); + if (code) goto _err; pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE; *ppWriter = pWriter; @@ -284,46 +266,31 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { pTsdb = (*ppWriter)->pTsdb; if (sync) { - if (taosFsyncFile((*ppWriter)->pHeadFD) < 0) { + if (tsdbFsyncFile((*ppWriter)->pHeadFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosFsyncFile((*ppWriter)->pDataFD) < 0) { + if (tsdbFsyncFile((*ppWriter)->pDataFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) { + if (tsdbFsyncFile((*ppWriter)->pSmaFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosFsyncFile((*ppWriter)->pLastFD) < 0) { + if (tsdbFsyncFile((*ppWriter)->pLastFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } } - if (taosCloseFile(&(*ppWriter)->pHeadFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (taosCloseFile(&(*ppWriter)->pDataFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + tsdbCloseFile(&(*ppWriter)->pHeadFD); + tsdbCloseFile(&(*ppWriter)->pDataFD); + tsdbCloseFile(&(*ppWriter)->pSmaFD); + tsdbCloseFile(&(*ppWriter)->pLastFD); for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) { tFree((*ppWriter)->aBuf[iBuf]); @@ -346,70 +313,42 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { // head ============== memset(hdr, 0, TSDB_FHDR_SIZE); tPutHeadFile(hdr, &pWriter->fHead); - taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); - n = taosLSeekFile(pWriter->pHeadFD, 0, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbLSeekFile(pWriter->pHeadFD, 0); + if (code) goto _err; - n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL); + if (code) goto _err; // data ============== memset(hdr, 0, TSDB_FHDR_SIZE); tPutDataFile(hdr, &pWriter->fData); - taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); - n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbLSeekFile(pWriter->pDataFD, 0); + if (code) goto _err; - n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL); + if (code) goto _err; // sma ============== memset(hdr, 0, TSDB_FHDR_SIZE); tPutSmaFile(hdr, &pWriter->fSma); - taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); - n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbLSeekFile(pWriter->pSmaFD, 0); + if (code) goto _err; - n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL); + if (code) goto _err; // sst ============== memset(hdr, 0, TSDB_FHDR_SIZE); tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]); - taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); - n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbLSeekFile(pWriter->pLastFD, 0); + if (code) goto _err; - n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL); + if (code) goto _err; return code; @@ -431,11 +370,9 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) { } // prepare - size = sizeof(uint32_t); for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx)); } - size += sizeof(TSCKSUM); // alloc code = tRealloc(&pWriter->aBuf[0], size); @@ -443,20 +380,14 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) { // build n = 0; - n = tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { n += tPutBlockIdx(pWriter->aBuf[0] + n, taosArrayGet(aBlockIdx, iBlockIdx)); } - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); + ASSERT(n == size); // write - n = taosWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL); + if (code) goto _err; // update pHeadFile->offset = pHeadFile->size; @@ -481,24 +412,16 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc ASSERT(mBlock->nItem > 0); // alloc - size = sizeof(uint32_t) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); + size = tPutMapData(NULL, mBlock); code = tRealloc(&pWriter->aBuf[0], size); if (code) goto _err; // build - n = 0; - n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); - n += tPutMapData(pWriter->aBuf[0] + n, mBlock); - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); + n = tPutMapData(pWriter->aBuf[0] + n, mBlock); // write - n = taosWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL); + if (code) goto _err; // update pBlockIdx->offset = pHeadFile->size; @@ -519,7 +442,7 @@ _err: int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { int32_t code = 0; SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1]; - int64_t size; + int64_t size = 0; int64_t n; // check @@ -529,11 +452,9 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { } // size - size = sizeof(uint32_t); // TSDB_FILE_DLMT for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL)); } - size += sizeof(TSCKSUM); // alloc code = tRealloc(&pWriter->aBuf[0], size); @@ -541,20 +462,13 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { // encode n = 0; - n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL)); } - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); // write - n = taosWriteFile(pWriter->pLastFD, pWriter->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pLastFD, pWriter->aBuf[0], size, NULL); + if (code) goto _err; // update pSstFile->offset = pSstFile->size; @@ -592,21 +506,14 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, // write if (pSmaInfo->size) { - int32_t size = pSmaInfo->size + sizeof(TSCKSUM); - - code = tRealloc(&pWriter->aBuf[0], size); + code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size); if (code) goto _err; - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - int64_t n = taosWriteFile(pWriter->pSmaFD, pWriter->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pSmaFD, pWriter->aBuf[0], pSmaInfo->size, NULL); + if (code) goto _err; pSmaInfo->offset = pWriter->fSma.size; - pWriter->fSma.size += size; + // pWriter->fSma.size += size; } return code; @@ -631,37 +538,25 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock if (code) goto _err; // write ================= - TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; + STsdbFD *pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; pBlkInfo->szKey = aBufN[3] + aBufN[2]; pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; - int64_t n = taosWriteFile(pFD, pWriter->aBuf[3], aBufN[3]); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pFD, pWriter->aBuf[3], aBufN[3], NULL); + if (code) goto _err; - n = taosWriteFile(pFD, pWriter->aBuf[2], aBufN[2]); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pFD, pWriter->aBuf[2], aBufN[2], NULL); + if (code) goto _err; if (aBufN[1]) { - n = taosWriteFile(pFD, pWriter->aBuf[1], aBufN[1]); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pFD, pWriter->aBuf[1], aBufN[1], NULL); + if (code) goto _err; } if (aBufN[0]) { - n = taosWriteFile(pFD, pWriter->aBuf[0], aBufN[0]); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pFD, pWriter->aBuf[0], aBufN[0], NULL); + if (code) goto _err; } // update info @@ -804,6 +699,7 @@ _err: int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) { int32_t code = 0; SDataFReader *pReader; + int32_t szPage = 4096; char fname[TSDB_FILENAME_LEN]; // alloc @@ -818,36 +714,24 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS // open impl // head tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); - pReader->pHeadFD = taosOpenFile(fname, TD_FILE_READ); - if (pReader->pHeadFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD); + if (code) goto _err; // data tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); - pReader->pDataFD = taosOpenFile(fname, TD_FILE_READ); - if (pReader->pDataFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD); + if (code) goto _err; // sma tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); - pReader->pSmaFD = taosOpenFile(fname, TD_FILE_READ); - if (pReader->pSmaFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD); + if (code) goto _err; // sst for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname); - pReader->aLastFD[iSst] = taosOpenFile(fname, TD_FILE_READ); - if (pReader->aLastFD[iSst] == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aLastFD[iSst]); + if (code) goto _err; } *ppReader = pReader; @@ -864,29 +748,17 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { if (*ppReader == NULL) goto _exit; // head - if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + tsdbCloseFile(&(*ppReader)->pHeadFD); // data - if (taosCloseFile(&(*ppReader)->pDataFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + tsdbCloseFile(&(*ppReader)->pDataFD); // sma - if (taosCloseFile(&(*ppReader)->pSmaFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + tsdbCloseFile(&(*ppReader)->pSmaFD); // sst for (int32_t iSst = 0; iSst < (*ppReader)->pSet->nSstF; iSst++) { - if (taosCloseFile(&(*ppReader)->aLastFD[iSst]) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + tsdbCloseFile(&(*ppReader)->aLastFD[iSst]); } for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) { @@ -919,14 +791,14 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // seek - if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + // // seek + // if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { + // code = TAOS_SYSTEM_ERROR(errno); + // goto _err; + // } // read - n = taosReadFile(pReader->pHeadFD, pReader->aBuf[0], size); + n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -982,14 +854,14 @@ int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // seek - if (taosLSeekFile(pReader->aLastFD[iSst], offset, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + // // seek + // if (taosLSeekFile(pReader->aLastFD[iSst], offset, SEEK_SET) < 0) { + // code = TAOS_SYSTEM_ERROR(errno); + // goto _err; + // } // read - n = taosReadFile(pReader->aLastFD[iSst], pReader->aBuf[0], size); + n = tsdbReadFile(pReader->aLastFD[iSst], offset, pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -1040,14 +912,14 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // seek - if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + // // seek + // if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { + // code = TAOS_SYSTEM_ERROR(errno); + // goto _err; + // } // read - n = taosReadFile(pReader->pHeadFD, pReader->aBuf[0], size); + n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -1097,18 +969,18 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aCol code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // seek - int64_t n = taosLSeekFile(pReader->pSmaFD, pSmaInfo->offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < pSmaInfo->offset) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + // // seek + // int64_t n = taosLSeekFile(pReader->pSmaFD, pSmaInfo->offset, SEEK_SET); + // if (n < 0) { + // code = TAOS_SYSTEM_ERROR(errno); + // goto _err; + // } else if (n < pSmaInfo->offset) { + // code = TSDB_CODE_FILE_CORRUPTED; + // goto _err; + // } // read - n = taosReadFile(pReader->pSmaFD, pReader->aBuf[0], size); + int64_t n = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -1148,11 +1020,12 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo tBlockDataClear(pBlockData); - TdFilePtr pFD = fromLast ? pReader->aLastFD[0] : pReader->pDataFD; // (todo) + STsdbFD *pFD = fromLast ? pReader->aLastFD[0] : pReader->pDataFD; // (todo) + + // todo: realloc pReader->aBuf[0] // uid + version + tskey - code = tsdbReadAndCheck(pFD, pBlkInfo->offset, &pReader->aBuf[0], pBlkInfo->szKey, 1); - if (code) goto _err; + tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey); // todo SDiskDataHdr hdr; uint8_t *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr); @@ -1192,8 +1065,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo if (hdr.szBlkCol > 0) { int64_t offset = pBlkInfo->offset + pBlkInfo->szKey; - code = tsdbReadAndCheck(pFD, offset, &pReader->aBuf[0], hdr.szBlkCol + sizeof(TSCKSUM), 1); - if (code) goto _err; + tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol + sizeof(TSCKSUM)); } SBlockCol blockCol = {.cid = 0}; @@ -1233,8 +1105,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + pBlockCol->offset; int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM); - code = tsdbReadAndCheck(pFD, offset, &pReader->aBuf[1], size, 0); - if (code) goto _err; + tsdbReadFile(pFD, offset, pReader->aBuf[1], size); code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]); if (code) goto _err; @@ -1321,8 +1192,7 @@ int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk int32_t code = 0; // read - code = tsdbReadAndCheck(pReader->aLastFD[iSst], pSstBlk->bInfo.offset, &pReader->aBuf[0], pSstBlk->bInfo.szBlock, 0); - if (code) goto _exit; + tsdbReadFile(pReader->aLastFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock); // decmpr code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); @@ -1708,3 +1578,37 @@ _err: tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } + +static int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) { + int32_t code = 0; + + // alloc + code = tRealloc(ppOut, size); + if (code) goto _exit; + + // seek + int64_t n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + // read + n = taosReadFile(pFD, *ppOut, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + // check + if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + +_exit: + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 6937a27fe4..9b3094bb2c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -2153,37 +2153,3 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in _exit: return code; } - -int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) { - int32_t code = 0; - - // alloc - code = tRealloc(ppOut, size); - if (code) goto _exit; - - // seek - int64_t n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } - - // read - n = taosReadFile(pFD, *ppOut, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; - } - - // check - if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; - } - -_exit: - return code; -} -- GitLab