From 7e1306a97657a752f45ae564270811cae968d8f6 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 4 Sep 2022 20:28:12 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 38 ++-- source/dnode/vnode/src/tsdb/tsdbFS.c | 23 ++- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 194 ++++-------------- 3 files changed, 69 insertions(+), 186 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index da74dc9828..eb405fd4a6 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -67,11 +67,12 @@ typedef struct SBlockCol SBlockCol; typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; -#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) -#define TSDB_MAX_SUBBLOCKS 8 -#define TSDB_MAX_SST_FILE 16 -#define TSDB_DEFAULT_SST_FILE 8 -#define TSDB_FHDR_SIZE 512 +#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) +#define TSDB_MAX_SUBBLOCKS 8 +#define TSDB_MAX_SST_FILE 16 +#define TSDB_DEFAULT_SST_FILE 8 +#define TSDB_FHDR_SIZE 512 +#define TSDB_DEFAULT_PAGE_SIZE 4096 #define HAS_NONE ((int8_t)0x1) #define HAS_NULL ((int8_t)0x2) @@ -578,20 +579,6 @@ struct SRowMerger { SArray *pArray; // SArray }; -struct SDelFWriter { - STsdb *pTsdb; - SDelFile fDel; - TdFilePtr pWriteH; - - uint8_t *aBuf[1]; -}; - -struct STsdbReadSnap { - SMemTable *pMem; - SMemTable *pIMem; - STsdbFS fs; -}; - typedef struct { char *path; int32_t szPage; @@ -602,6 +589,19 @@ typedef struct { int64_t szFile; } STsdbFD; +struct SDelFWriter { + STsdb *pTsdb; + SDelFile fDel; + STsdbFD *pWriteH; + uint8_t *aBuf[1]; +}; + +struct STsdbReadSnap { + SMemTable *pMem; + SMemTable *pIMem; + STsdbFS fs; +}; + struct SDataFWriter { STsdb *pTsdb; SDFileSet wSet; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index e6bc9d9936..0577faf855 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -15,12 +15,17 @@ #include "tsdb.h" +#define LOGIC_TO_FILE_SIZE(LSIZE, PAGE) (0) // todo + // ================================================================================================= static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) { int32_t n = 0; int8_t hasDel = pFS->pDelFile ? 1 : 0; uint32_t nSet = taosArrayGetSize(pFS->aDFileSet); + // version + n += tPutI8(p ? p + n : p, 0); + // SDelFile n += tPutI8(p ? p + n : p, hasDel); if (hasDel) { @@ -292,7 +297,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (size != pSet->pHeadF->size) { + if (size != LOGIC_TO_FILE_SIZE(pSet->pHeadF->size, TSDB_DEFAULT_PAGE_SIZE)) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } @@ -303,10 +308,10 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (size < pSet->pDataF->size) { + if (size < LOGIC_TO_FILE_SIZE(pSet->pDataF->size, TSDB_DEFAULT_PAGE_SIZE)) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; - } else if (size > pSet->pDataF->size) { + } else if (size > LOGIC_TO_FILE_SIZE(pSet->pDataF->size, TSDB_DEFAULT_PAGE_SIZE)) { code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE); if (code) goto _err; } @@ -317,10 +322,10 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (size < pSet->pSmaF->size) { + if (size < LOGIC_TO_FILE_SIZE(pSet->pSmaF->size, TSDB_DEFAULT_PAGE_SIZE)) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; - } else if (size > pSet->pSmaF->size) { + } else if (size > LOGIC_TO_FILE_SIZE(pSet->pSmaF->size, TSDB_DEFAULT_PAGE_SIZE)) { code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE); if (code) goto _err; } @@ -332,7 +337,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (size != pSet->aSstF[iSst]->size) { + if (size != LOGIC_TO_FILE_SIZE(pSet->aSstF[iSst]->size, TSDB_DEFAULT_PAGE_SIZE)) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } @@ -364,10 +369,12 @@ static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) { int32_t code = 0; int8_t hasDel; uint32_t nSet; - int32_t n; + int32_t n = 0; + + // version + n += tGetI8(pData + n, NULL); // SDelFile - n = 0; n += tGetI8(pData + n, &hasDel); if (hasDel) { pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 0f50714d3e..91de4b3468 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -15,8 +15,6 @@ #include "tsdb.h" -#define TSDB_DEFAULT_PAGE_SIZE 4096 - // =============== PAGE-WISE FILE =============== #define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM)) #define LOGIC_TO_FILE_OFFSET(OFFSET, PAGE) \ @@ -1137,7 +1135,7 @@ _err: int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) { int32_t code = 0; char fname[TSDB_FILENAME_LEN]; - char hdr[TSDB_FHDR_SIZE] = {0}; + uint8_t hdr[TSDB_FHDR_SIZE] = {0}; SDelFWriter *pDelFWriter; int64_t n; @@ -1151,18 +1149,13 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb pDelFWriter->fDel = *pFile; tsdbDelFileName(pTsdb, pFile, fname); - pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE); - if (pDelFWriter->pWriteH == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = + tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH); + if (code) goto _err; // update header - n = taosWriteFile(pDelFWriter->pWriteH, &hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pDelFWriter->pWriteH, 0, hdr, TSDB_FHDR_SIZE); + if (code) goto _err; pDelFWriter->fDel.size = TSDB_FHDR_SIZE; pDelFWriter->fDel.offset = 0; @@ -1182,16 +1175,13 @@ int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) { STsdb *pTsdb = pWriter->pTsdb; // sync - if (sync && taosFsyncFile(pWriter->pWriteH) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + if (sync) { + code = tsdbFsyncFile(pWriter->pWriteH); + if (code) goto _err; } // close - if (taosCloseFile(&pWriter->pWriteH) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + tsdbCloseFile(&pWriter->pWriteH); for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) { tFree(pWriter->aBuf[iBuf]); @@ -1212,11 +1202,10 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelId int64_t n; // prepare - size = sizeof(uint32_t); + size = 0; for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData)); } - size += sizeof(TSCKSUM); // alloc code = tRealloc(&pWriter->aBuf[0], size); @@ -1224,22 +1213,14 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelId // build n = 0; - n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData)); } - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); + ASSERT(n == size); // write - n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - ASSERT(n == size); + code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size); + if (code) goto _err; // update pDelIdx->offset = pWriter->fDel.size; @@ -1260,11 +1241,10 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) { SDelIdx *pDelIdx; // prepare - size = sizeof(uint32_t); + size = 0; for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx)); } - size += sizeof(TSCKSUM); // alloc code = tRealloc(&pWriter->aBuf[0], size); @@ -1272,20 +1252,14 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) { // build n = 0; - n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx)); } - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); + ASSERT(n == size); // write - n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size); + if (code) goto _err; // update pWriter->fDel.offset = pWriter->fDel.size; @@ -1300,27 +1274,16 @@ _err: int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) { int32_t code = 0; - char hdr[TSDB_FHDR_SIZE]; + char hdr[TSDB_FHDR_SIZE] = {0}; int64_t size = TSDB_FHDR_SIZE; int64_t n; // build - memset(hdr, 0, size); tPutDelFile(hdr, &pWriter->fDel); - taosCalcChecksumAppend(0, hdr, size); - - // seek - if (taosLSeekFile(pWriter->pWriteH, 0, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } // write - n = taosWriteFile(pWriter->pWriteH, hdr, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbWriteFile(pWriter->pWriteH, 0, hdr, size); + if (code) goto _err; return code; @@ -1330,10 +1293,9 @@ _err: } // SDelFReader ==================================================== struct SDelFReader { - STsdb *pTsdb; - SDelFile fDel; - TdFilePtr pReadH; - + STsdb *pTsdb; + SDelFile fDel; + STsdbFD *pReadH; uint8_t *aBuf[1]; }; @@ -1355,14 +1317,9 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb pDelFReader->fDel = *pFile; tsdbDelFileName(pTsdb, pFile, fname); - pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ); - if (pDelFReader->pReadH == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(pDelFReader); - goto _err; - } + code = tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ, &pDelFReader->pReadH); + if (code) goto _err; -_exit: *ppReader = pDelFReader; return code; @@ -1377,10 +1334,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) { SDelFReader *pReader = *ppReader; if (pReader) { - if (taosCloseFile(&pReader->pReadH) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } + tsdbCloseFile(&pReader->pReadH); for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) { tFree(pReader->aBuf[iBuf]); } @@ -1400,38 +1354,17 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData taosArrayClear(aDelData); - // seek - if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - // alloc code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; // read - n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size); + if (code) goto _err; // // decode n = 0; - - uint32_t delimiter; - n += tGetU32(pReader->aBuf[0] + n, &delimiter); - while (n < size - sizeof(TSCKSUM)) { + while (n < size) { SDelData delData; n += tGetDelData(pReader->aBuf[0] + n, &delData); @@ -1440,8 +1373,7 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData goto _err; } } - - ASSERT(n == size - sizeof(TSCKSUM)); + ASSERT(n == size); return code; @@ -1458,39 +1390,17 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) { taosArrayClear(aDelIdx); - // seek - if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - // alloc code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; // read - n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size); + if (code) goto _err; // decode n = 0; - uint32_t delimiter; - n += tGetU32(pReader->aBuf[0] + n, &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); - - while (n < size - sizeof(TSCKSUM)) { + while (n < size) { SDelIdx delIdx; n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx); @@ -1501,45 +1411,11 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) { } } - ASSERT(n == size - sizeof(TSCKSUM)); + ASSERT(n == size); return code; _err: tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; -} - -static int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) { - int32_t code = 0; - - // alloc - code = tRealloc(ppOut, size); - if (code) goto _exit; - - // seek - int64_t n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } - - // read - n = taosReadFile(pFD, *ppOut, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; - } - - // check - if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; - } - -_exit: - return code; } \ No newline at end of file -- GitLab