From d4e84b4fb29d7788439285ec6f2d7f1427a05a69 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 15 Jun 2020 11:08:17 +0000 Subject: [PATCH] TD-353 --- src/tsdb/inc/tsdbMain.h | 30 ++++---- src/tsdb/src/tsdbRWHelper.c | 141 ++++++++++++++++++++++++++++-------- 2 files changed, 127 insertions(+), 44 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 78492eb832..34b8b99b5b 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -115,15 +115,18 @@ typedef struct { typedef enum { TSDB_FILE_TYPE_HEAD = 0, TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_LAST, TSDB_FILE_TYPE_MAX } TSDB_FILE_TYPE; typedef struct { + uint32_t offset; + uint32_t len; + uint64_t size; // total size of the file + uint64_t tombSize; // unused file size + uint32_t totalBlocks; + uint32_t totalSubBlocks; } STsdbFileInfo; typedef struct { - char* fname; - int fd; - uint64_t size; - uint64_t tombSize; - uint64_t totalBlocks; - uint64_t totalSubBlocks; + char* fname; + int fd; + STsdbFileInfo info; } SFile; typedef struct { @@ -201,7 +204,7 @@ typedef struct { typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; typedef struct { - int fid; + int fid; TSKEY minKey; TSKEY maxKey; // For read/write purpose @@ -232,7 +235,7 @@ typedef struct { SCompInfo* pCompInfo; bool hasOldLastBlock; // For block set usage - SCompData* pCompData[TSDB_MAX_SUBBLOCKS]; + SCompData* pCompData; SDataCols* pDataCols[2]; void* pBuffer; // Buffer to hold the whole data block void* compBuffer; // Buffer for temperary compress/decompress purpose @@ -313,9 +316,12 @@ void tsdbFreeFileH(STsdbFileH* pFileH); #define TSDB_HELPER_TABLE_SET 0x4 // Table is set #define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded #define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded +#define helperSetState(h, s) (((h)->state) |= (s)) +#define helperClearState(h, s) ((h)->state &= (~(s))) +#define helperHasState(h, s) ((((h)->state) & (s)) == (s)) +#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx) #define TSDB_MAX_SUBBLOCKS 8 #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) - #define helperType(h) (h)->type #define helperRepo(h) (h)->pRepo #define helperState(h) (h)->state @@ -333,12 +339,6 @@ void* tsdbCommitData(void* arg); // --------- Helper state -#define TSDB_HELPER_TYPE(h) ((h)->config.type) - -#define helperSetState(h, s) (((h)->state) |= (s)) -#define helperClearState(h, s) ((h)->state &= (~(s))) -#define helperHasState(h, s) ((((h)->state) & (s)) == (s)) -#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx) int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo); int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 0be6ae530a..d1a41ce550 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -71,7 +71,10 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { char *fnameDup = strdup(pHelper->files.headF.fname); - if (fnameDup == NULL) return -1; + if (fnameDup == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } char *dataDir = dirname(fnameDup); tsdbGetFileName(dataDir, pHelper->files.fid, ".h", pHelper->files.nHeadF.fname); @@ -81,21 +84,28 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { // Open the files if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; - if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { + if (helperType(pHelper) == TSDB_WRITE_HELPER) { if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err; if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err; // Create and open .h if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1; // size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM); - if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) + if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), + TSDB_FILE_HEAD_SIZE, hpHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); + errno = TAOS_SYSTEM_ERROR(errno); goto _err; + } // Create and open .l file if should if (tsdbShouldCreateNewLast(pHelper)) { if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err; if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) - goto _err; + tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), + TSDB_FILE_HEAD_SIZE, hpHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; } } else { if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err; @@ -164,7 +174,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { pHelper->tableInfo.tid = pTable->tableId.tid; pHelper->tableInfo.uid = pTable->tableId.uid; - STSchema *pSchema = tsdbGetTableSchema(pRepo->tsdbMeta, pTable); + STSchema *pSchema = tsdbGetTableSchema(pTable); pHelper->tableInfo.sversion = schemaVersion(pSchema); tdInitDataCols(pHelper->pDataCols[0], pSchema); @@ -354,28 +364,45 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { if (pFile->info.offset > 0) { ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE); - if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) return -1; - if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) return -1; - if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) return -1; + if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s to %u since %s", REPO_ID(pHelper->pRepo), pFile->fname, + pFile->info.offset, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) { + tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, + pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) { - // TODO: File is broken, try to deal with it + tsdbError("vgId:%d file %s SCompIdx part is corrupted. offset %u len %u", REPO_ID(pHelper->pRepo), pFile->fname, + pFile->info.offset, pFile->info.len); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } // Decode it void *ptr = pHelper->pBuffer; - while (((char *)ptr - (char *)pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) { + while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) { uint32_t tid = 0; if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1; ASSERT(tid > 0 && tid < pHelper->config.maxTables); if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1; - ASSERT((char *)ptr - (char *)pHelper->pBuffer <= pFile->info.len - sizeof(TSCKSUM)); + ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= pFile->info.len - sizeof(TSCKSUM)); } - ASSERT(((char *)ptr - (char *)pHelper->pBuffer) == (pFile->info.len - sizeof(TSCKSUM))); - if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } } } helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); @@ -507,8 +534,16 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); - if (lseek(pFile->fd, 0, SEEK_SET) < 0) return -1; - if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1; + if (lseek(pFile->fd, 0, SEEK_SET) < 0) { + tsdbError("failed to lseek file %s since %s", pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } return 0; } @@ -851,6 +886,9 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); } + tsdbTrace("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, + blkIdx); + return 0; _err: @@ -933,6 +971,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last; + tsdbTrace("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); + return 0; _err: @@ -971,6 +1011,9 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last; + tsdbTrace("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, + blkIdx); + return 0; } @@ -1001,18 +1044,26 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { memset((void *)&pHelper->files, 0, sizeof(pHelper->files)); pHelper->files.fid = -1; + tfree(pHelper->files.headF.fname); pHelper->files.headF.fd = -1; + tfree(pHelper->files.dataF.fname); pHelper->files.dataF.fd = -1; + tfree(pHelper->files.lastF.fname); pHelper->files.lastF.fd = -1; + tfree(pHelper->files.nHeadF.fname); pHelper->files.nHeadF.fd = -1; + tfree(pHelper->files.nLastF.fname); pHelper->files.nLastF.fd = -1; } static int tsdbInitHelperFile(SRWHelper *pHelper) { - // pHelper->compIdxSize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM); - size_t tsize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM); + STsdbCfg *pCfg = &pHelper->pRepo->config; + size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM); pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize); - if (pHelper->pCompIdx == NULL) return -1; + if (pHelper->pCompIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } tsdbResetHelperFileImpl(pHelper); return 0; @@ -1020,6 +1071,7 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) { static void tsdbDestroyHelperFile(SRWHelper *pHelper) { tsdbCloseHelperFile(pHelper, false); + tsdbResetHelperFileImpl(pHelper); tzfree(pHelper->pCompIdx); } @@ -1051,9 +1103,16 @@ static void tsdbResetHelperBlock(SRWHelper *pHelper) { } static int tsdbInitHelperBlock(SRWHelper *pHelper) { - pHelper->pDataCols[0] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows); - pHelper->pDataCols[1] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows); - if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) return -1; + STsdbRepo *pRepo = helperRepo(pHelper); + + pHelper->pDataCols[0] = + tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock); + pHelper->pDataCols[1] = + tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock); + if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } tsdbResetHelperBlockImpl(pHelper); @@ -1067,6 +1126,7 @@ static void tsdbDestroyHelperBlock(SRWHelper *pHelper) { } static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) { + STsdbCfg *pCfg = &pRepo->config; memset((void *)pHelper, 0, sizeof(*pHelper)); helperType(pHelper) = type; @@ -1083,9 +1143,12 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t if (tsdbInitHelperBlock(pHelper) < 0) goto _err; pHelper->pBuffer = - tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols + - pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM)); - if (pHelper->pBuffer == NULL) goto _err; + tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pRepo->imem->maxCols + + pRepo->imem->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM)); + if (pHelper->pBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } return 0; @@ -1167,13 +1230,30 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa SCompData *pCompData = (SCompData *)pHelper->pBuffer; - int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; - if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err; - if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err; + SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + + int fd = pFile->fd; + if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, + pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) { + tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len, + pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); - if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err; + if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) { + tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo), + pFile->fname, pCompBlock->offset, pCompBlock->len); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + goto _err; + } pDataCols->numOfRows = pCompBlock->numOfRows; @@ -1198,7 +1278,10 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows); } pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize); - if (pHelper->compBuffer == NULL) goto _err; + if (pHelper->compBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } } if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len, pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints, -- GitLab