diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index e7537f6969df3ebc28011674e456934df20ed8b1..8a0a9e1208b7fb45994be95200a3f2a2a0ceb4ff 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -465,7 +465,7 @@ typedef struct { SCompData *pCompData; SDataCols *pDataCols[2]; - void *blockBuffer; // Buffer to hold the whole data block + void *pBuffer; // Buffer to hold the whole data block void *compBuffer; // Buffer for temperary compress/decompress purpose } SRWHelper; @@ -512,6 +512,7 @@ void tsdbFitRetention(STsdbRepo *pRepo); int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); void tsdbAdjustCacheBlocks(STsdbCache *pCache); int32_t tsdbGetMetaFileName(char *rootDir, char *fname); +int tsdbUpdateFileHeader(SFile *pFile, uint32_t version); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index c9db9344c9e7ed1279d25183b98944f77d3649cb..50ef078a0c06499db096e3f8dd4aaff86eab495a 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -37,7 +37,6 @@ const char *tsdbFileSuffix[] = { static int compFGroupKey(const void *key, const void *fgroup); static int compFGroup(const void *arg1, const void *arg2); -static int tsdbWriteFileHead(SFile *pFile); static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) { @@ -83,11 +82,21 @@ void tsdbCloseFileH(STsdbFileH *pFileH) { } static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) { + uint32_t version; + char buf[512] = "\0"; + tsdbGetFileName(dataDir, fid, suffix, pFile->fname); if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1; pFile->fd = -1; - // TODO: recover the file info - // pFile->info = {0}; + if (tsdbOpenFile(pFile, O_RDONLY) < 0) return -1; + + if (tread(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1; + if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1; + + void *pBuf = buf; + pBuf = taosDecodeFixed32(pBuf, &version); + pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); + return 0; } @@ -284,18 +293,6 @@ static int compFGroup(const void *arg1, const void *arg2) { return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId; } -static int tsdbWriteFileHead(SFile *pFile) { - char head[TSDB_FILE_HEAD_SIZE] = "\0"; - - pFile->info.size += TSDB_FILE_HEAD_SIZE; - - // TODO: write version and File statistic to the head - lseek(pFile->fd, 0, SEEK_SET); - if (write(pFile->fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; - - return 0; -} - int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) { if (dataDir == NULL || fname == NULL) return -1; @@ -345,7 +342,9 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) return -1; } - if (tsdbWriteFileHead(pFile) < 0) { + pFile->info.size = TSDB_FILE_HEAD_SIZE; + + if (tsdbUpdateFileHeader(pFile, 0) < 0) { tsdbCloseFile(pFile); return -1; } diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 4b3c91bb02839f8c4f75bb3c43a9cf7be1f3eea5..79507aa15b2042258911ce6bb6fd4fd5212b45fa 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -132,10 +132,10 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t // Init block part if (tsdbInitHelperBlock(pHelper) < 0) goto _err; - pHelper->blockBuffer = + pHelper->pBuffer = tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols + pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM)); - if (pHelper->blockBuffer == NULL) goto _err; + if (pHelper->pBuffer == NULL) goto _err; return 0; @@ -155,7 +155,7 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { void tsdbDestroyHelper(SRWHelper *pHelper) { if (pHelper) { - tzfree(pHelper->blockBuffer); + tzfree(pHelper->pBuffer); tzfree(pHelper->compBuffer); tsdbDestroyHelperFile(pHelper); tsdbDestroyHelperTable(pHelper); @@ -241,6 +241,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { pHelper->files.headF.fd = -1; } if (pHelper->files.dataF.fd > 0) { + if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0); close(pHelper->files.dataF.fd); pHelper->files.dataF.fd = -1; } @@ -249,6 +250,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { pHelper->files.lastF.fd = -1; } if (pHelper->files.nHeadF.fd > 0) { + if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0); close(pHelper->files.nHeadF.fd); pHelper->files.nHeadF.fd = -1; if (hasError) { @@ -260,6 +262,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { } if (pHelper->files.nLastF.fd > 0) { + if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0); close(pHelper->files.nLastF.fd); pHelper->files.nLastF.fd = -1; if (hasError) { @@ -419,7 +422,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); pIdx->uid = pHelper->tableInfo.uid; if (pIdx->offset < 0) return -1; - ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx)); + ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; } @@ -435,7 +438,8 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { SFile *pFile = &(pHelper->files.nHeadF); pFile->info.offset = offset; - void *buf = pHelper->blockBuffer; + // TODO: change the implementation of pHelper->pBuffer + void *buf = pHelper->pBuffer; for (uint32_t i = 0; i < pHelper->config.maxTables; i++) { SCompIdx *pCompIdx = pHelper->pCompIdx + i; if (pCompIdx->offset > 0) { @@ -444,10 +448,10 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { } } - int tsize = (char *)buf - (char *)pHelper->blockBuffer + sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)pHelper->blockBuffer, tsize); + int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize); - if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->blockBuffer, tsize) < tsize) return -1; + if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1; pFile->info.len = tsize; return 0; } @@ -465,23 +469,23 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE); if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) return -1; - if (tread(fd, (void *)(pHelper->blockBuffer), pFile->info.len) < pFile->info.len) + if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) return -1; - if (!taosCheckChecksumWhole((uint8_t *)(pHelper->blockBuffer), pFile->info.len)) { + if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) { // TODO: File is broken, try to deal with it return -1; } // Decode it - void *ptr = pHelper->blockBuffer; - while ((char *)ptr - (char *)pHelper->blockBuffer >= pFile->info.len - sizeof(TSCKSUM)) { + void *ptr = pHelper->pBuffer; + while ((char *)ptr - (char *)pHelper->pBuffer >= pFile->info.len - sizeof(TSCKSUM)) { uint32_t tid = 0; if ((ptr = taosDecodeVariant32(ptr, &tid)) == NULL) return -1; ASSERT(tid > 0 && tid < pHelper->config.maxTables); if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1; - ASSERT((char *)ptr - (char *)pHelper->blockBuffer <= pFile->info.len - sizeof(TSCKSUM)); + ASSERT((char *)ptr - (char *)pHelper->pBuffer <= pFile->info.len - sizeof(TSCKSUM)); } } @@ -626,9 +630,9 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - ASSERT(tsizeof(pHelper->blockBuffer) >= pCompBlock->len); + ASSERT(tsizeof(pHelper->pBuffer) >= pCompBlock->len); - SCompData *pCompData = (SCompData *)pHelper->blockBuffer; + SCompData *pCompData = (SCompData *)pHelper->pBuffer; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err; @@ -721,7 +725,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && rowsToWrite <= pHelper->config.maxRowsPerFileBlock); - SCompData *pCompData = (SCompData *)(pHelper->blockBuffer); + SCompData *pCompData = (SCompData *)(pHelper->pBuffer); int64_t offset = 0; offset = lseek(pFile->fd, 0, SEEK_END); @@ -776,7 +780,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa } pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))( - (char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, + (char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize, pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer)); } else { pCompCol->len = tlen; @@ -1242,11 +1246,8 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { void *pBuf = (void *)buf; pBuf = taosEncodeFixed32(pBuf, version); pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info)); - int tsize = (char *)pBuf - buf + sizeof(TSCKSUM); - ASSERT(tsize <= TSDB_FILE_HEAD_SIZE); - - taosCalcChecksumAppend(0, (uint8_t *)buf, tsize); + taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); if (lseek(pFile->fd, 0, SEEK_SET) < 0) return -1; if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1;