diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index cb3f7eb22ce29810f2c3440ee0d13245e2593280..50cf80857ea201502c7d18c91f820a218607b087 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -332,9 +332,11 @@ typedef struct { TSKEY maxKey; SBlockIdx* pBlockIdx; int nBlockIdx; + int cBlockIdx; SBlockIdx* pCurBlockIdx; STable* pTable; SBlockInfo* pBlockInfo; + SBlockData* pBlockData; SDataCols* pDataCols[2]; void* pBuf; void* pCBuf; @@ -592,6 +594,9 @@ typedef struct { void* pBuffer; SList* pModLog; } SCommitHandle; + +void tsdbResetFGroupFd(SFileGroup* pFGroup); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index 2c6529ab4c7b4d10ac434625a4e9e9d16a45f024..5e58ad48acf1f4149696507165239180f07c9f5c 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -21,27 +21,57 @@ #include "tscompression.h" #include "tsdbMain.h" -int tsdbInitReadHandle(SReadHandle *pReadH, STsdbRepo *pRepo) { +#define TSDB_KEY_COL_OFFSET 0 + +SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) { + SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH)); + if (pReadH == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + pReadH->pRepo = pRepo; - return 0; + + STsdbMeta *pMeta = pRepo->tsdbMeta; + STsdbCfg * pCfg = &(pRepo->config); + + if ((pReadH->pDataCols[0] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL || + (pReadH->pDataCols[1] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbFreeReadHandle(pReadH); + return NULL; + } + + tsdbResetFGroupFd(&(pReadH->fGroup)); + + return pReadH; } -void tsdbDestroyReadHandle(SReadHandle *pReadH) { - // TODO +void tsdbFreeReadHandle(SReadHandle *pReadH) { + if (pReadH) { + taosTZfree(pReadH->pBlockIdx); + taosTZfree(pReadH->pBlockInfo); + taosTZfree(pReadH->pBlockData); + tdFreeDataCols(pReadH->pDataCols[0]); + tdFreeDataCols(pReadH->pDataCols[1]); + taosTZfree(pReadH->pBuf); + taosTZfree(pReadH->pCBuf); + free(pReadH); + } } int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { STsdbRepo *pRepo = pReadH->pRepo; + STsdbCfg * pCfg = &(pRepo->config); + pReadH->fGroup = *pFGroup; - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - pReadH->fGroup.files[type].fd = -1; - } + tsdbResetFGroupFd(&(pReadH->fGroup)); for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = &(pReadH->fGroup.files[type]); + SFile *pFile = TSDB_READ_FILE(pReadH, type); - if (pFile->fname[0] != '\0') { + if (pFile->fname[0] != '\0') { // pFile->fname[0] == '\0' is for commit usage pFile->fd = open(pFile->fname, O_RDONLY); if (pFile->fd < 0) { tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); @@ -52,14 +82,16 @@ int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { } } + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pFGroup->fileId, &(pReadH->minKey), &(pReadH->maxKey)); + return 0; } void tsdbCloseAndUnsetFile(SReadHandle *pReadH) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = &(pReadH->fGroup.files[type]); + SFile *pFile = TSDB_READ_FILE(pReadH, type); - if (pFile->fd > 0) { + if (pFile->fd >= 0) { (void)close(pFile->fd); pFile->fd = -1; } @@ -68,20 +100,22 @@ void tsdbCloseAndUnsetFile(SReadHandle *pReadH) { int tsdbLoadBlockIdx(SReadHandle *pReadH) { STsdbRepo *pRepo = pReadH->pRepo; - SFile * pFile = &(pReadH->fGroup.files[TSDB_FILE_TYPE_HEAD]); + SFile * pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD); if (pFile->fd < 0 || pFile->info.len == 0) { pReadH->nBlockIdx = 0; + pReadH->pCurBlockIdx = NULL; return 0; } if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } if (lseek(pFile->fd, pFile->info.offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); + tsdbError("vgId:%d failed to lseek file %s to offset %u since %s", REPO_ID(pRepo), pFile->fname, pFile->info.offset, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -94,7 +128,7 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) { return -1; } - if (ret < pFile->info.len || !taosCheckChecksumWhole((uint8_t *)pReadH->pBuf, pFile->info.len)) { + if (ret < pFile->info.len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), pFile->info.len)) { tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u", REPO_ID(pRepo), pFile->fname, pFile->info.offset, pFile->info.len); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; @@ -107,13 +141,53 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) { return -1; } + pReadH->cBlockIdx = 0; + return 0; } int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) { + ASSERT(pTable != NULL); + pReadH->pTable = pTable; - // TODO - // pReadH->pCurBlockIdx = NULL; + + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + ASSERT(pSchema != NULL); + + if (tdInitDataCols(pReadH->pDataCols[0], pSchema) < 0 || tdInitDataCols(pReadH->pDataCols[1], pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (pReadH->nBlockIdx > 0) { + ASSERT(pReadH->cBlockIdx <= pReadH->nBlockIdx); + + while (true) { + if (pReadH->cBlockIdx >= pReadH->nBlockIdx) { + pReadH->pCurBlockIdx = NULL; + break; + } + + SBlockIdx *pBlockIdx = pReadH->pBlockIdx + pReadH->cBlockIdx; + if (pBlockIdx->tid == TABLE_TID(pTable)) { + if (pBlockIdx->uid == TABLE_UID(pTable)) { + pReadH->pCurBlockIdx = pBlockIdx; + } else { + pReadH->pCurBlockIdx = NULL; + } + pReadH->cBlockIdx++; + break; + } else if (pBlockIdx->tid < TABLE_TID(pTable)) { + pReadH->cBlockIdx++; + } else { + pReadH->pCurBlockIdx = NULL; + break; + } + } + } else { + pReadH->pCurBlockIdx = NULL; + } + return 0; } @@ -132,12 +206,13 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { } if (lseek(pFile->fd, pBlockIdx->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); + tsdbError("vgId:%d failed to lseek file %s to offset %u since %s", REPO_ID(pRepo), pFile->fname, pBlockIdx->offset, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - ssize_t ret = taosTRead(pFile->fd, (void *)pReadH->pBlockInfo, pBlockIdx->len); + ssize_t ret = taosTRead(pFile->fd, (void *)(pReadH->pBlockInfo), pBlockIdx->len); if (ret < 0) { tsdbError("vgId:%d failed to read block info part of table %s from file %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pReadH->pTable), pFile->fname, strerror(errno)); @@ -146,8 +221,8 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { } if (ret < pBlockIdx->len || tsdbVerifyBlockInfo(pReadH->pBlockInfo, pBlockIdx) < 0) { - tsdbError("vgId:%d table %s block info part is corrupted from file %s since %s", REPO_ID(pRepo), - TABLE_CHAR_NAME(pReadH->pTable), pFile->fname, strerror(errno)); + tsdbError("vgId:%d table %s block info part is corrupted from file %s", REPO_ID(pRepo), + TABLE_CHAR_NAME(pReadH->pTable), pFile->fname); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -198,6 +273,49 @@ int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBloc if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) goto _err; } + ASSERT(pReadH->pDataCols[0]->numOfRows == pBlock->numOfRows); + ASSERT(dataColsKeyFirst(pReadH->pDataCols[0]) == pBlock->keyFirst); + ASSERT(dataColsKeyLast(pReadH->pDataCols[0]) == pBlock->keyLast); + + return 0; +} + +int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) { + ASSERT(pBlock->numOfSubBlocks <= 1); + + STsdbRepo *pRepo = pReadH->pRepo; + SFile * pFile = + (pBlock->last) ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); + + if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname, + pBlock->offset, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols); + if (tsdbAllocBuf(&(pReadH->pBlockData), tsize) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + int ret = taosTRead(pFile->fd, tsize); + if (ret < 0) { + tsdbError("vgId:%d failed to read block data info part from file %s offset %" PRId64 " len %d since %s", + REPO_ID(pRepo), pFile->fname, pBlock->offset, tsize, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (ret < tsize || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBlockData), tsize)) { + tsdbError("vgId:%d block data info part is corrupted in file %s offset %" PRId64 " len %d", REPO_ID(pRepo), + pFile->fname, pBlock->offset, tsize); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + ASSERT(pReadH->pBlockData->numOfCols == pBlock->numOfCols); return 0; } @@ -214,15 +332,16 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols } if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); + tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname, + pBlock->offset, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - int ret = taosTRead(pFile->fd, (void *)pReadH->pBuf, pBlock->len); + int ret = taosTRead(pFile->fd, (void *)(pReadH->pBuf), pBlock->len); if (ret < 0) { - tsdbError("vgId:%d failed to read block data part from file %s since %s", REPO_ID(pRepo), pFile->fname, - strerror(errno)); + tsdbError("vgId:%d failed to read block data part from file %s at offset %" PRId64 " len %d since %s", + REPO_ID(pRepo), pFile->fname, pBlock->offset, pBlock->len, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -240,6 +359,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ASSERT(pBlockData->numOfCols = pBlock->numOfCols); tdResetDataCols(pDataCols); + ASSERT(pBlock->numOfRows <= pDataCols->maxPoints); pDataCols->numOfRows = pBlock->numOfRows; int ccol = 0; @@ -269,7 +389,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols if (pBlock->algorithm == TWO_STAGE_COMP) { // extend compression buffer int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows); + zsize += (sizeof(VarDataLenT) * pBlock->numOfRows); } if (tsdbAllocBuf(&(pReadH->pCBuf), zsize) < 0) { @@ -280,7 +400,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm, pBlock->numOfRows, pDataCols->maxPoints, pReadH->pCBuf, (int32_t)taosTSizeof(pReadH->pCBuf)) < 0) { - tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d", REPO_ID(pRepo), + tsdbError("vgId:%d file %s is corrupted at column %d block offset %" PRId64 " column offset %d", REPO_ID(pRepo), pFile->fname, tcolId, (int64_t)pBlock->offset, toffset); return -1; } @@ -305,26 +425,27 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC SFile *pFile = pBlock->last ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); - SBlockCol compCol = {0}; + SBlockCol blockCol = {0}; // If only load timestamp column, no need to load SBlockData part - if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err; + if (numOfColIds > 1 && tsdbLoadBlockDataInfo(pReadH, pBlock) < 0) return -1; - pDataCols->numOfRows = pCompBlock->numOfRows; + tdResetDataCols(pDataCols); + pDataCols->numOfRows = pBlock->numOfRows; int dcol = 0; int ccol = 0; for (int i = 0; i < numOfColIds; i++) { - int16_t colId = colIds[i]; - SDataCol *pDataCol = NULL; - SBlockCol *pCompCol = NULL; + int16_t colId = colIds[i]; + SDataCol * pDataCol = NULL; + SBlockCol *pBlockCol = NULL; while (true) { if (dcol >= pDataCols->numOfCols) { pDataCol = NULL; break; } - pDataCol = &pDataCols->cols[dcol]; + pDataCol = &(pDataCols->cols[dcol]); if (pDataCol->colId > colId) { pDataCol = NULL; break; @@ -338,52 +459,49 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC ASSERT(pDataCol->colId == colId); if (colId == 0) { // load the key row - compCol.colId = colId; - compCol.len = pCompBlock->keyLen; - compCol.type = pDataCol->type; - compCol.offset = TSDB_KEY_COL_OFFSET; - pCompCol = &compCol; + blockCol.colId = colId; + blockCol.len = pBlock->keyLen; + blockCol.type = pDataCol->type; + blockCol.offset = TSDB_KEY_COL_OFFSET; + pBlockCol = &blockCol; } else { // load non-key rows while (true) { - if (ccol >= pCompBlock->numOfCols) { - pCompCol = NULL; + if (ccol >= pBlock->numOfCols) { + pBlockCol = NULL; break; } - pCompCol = &(pHelper->pCompData->cols[ccol]); - if (pCompCol->colId > colId) { - pCompCol = NULL; + pBlockCol = &(pReadH->pBlockData->cols[ccol]); + if (pBlockCol->colId > colId) { + pBlockCol = NULL; break; } else { ccol++; - if (pCompCol->colId == colId) break; + if (pBlockCol->colId == colId) break; } } - if (pCompCol == NULL) { - dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); + if (pBlockCol == NULL) { + dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints); continue; } - ASSERT(pCompCol->colId == pDataCol->colId); + ASSERT(pBlockCol->colId == pDataCol->colId); } - if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err; + if (tsdbLoadColData(pReadH, pFile, pBlock, pBlockCol, pDataCol) < 0) return -1; } return 0; - -_err: - return -1; } static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { - void *pBuf = pReadH->pBuf; - SFile *pFile = &(pReadH->fGroup.files[TSDB_FILE_TYPE_HEAD]); + void * pBuf = pReadH->pBuf; + SFile *pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD); pReadH->nBlockIdx = 0; while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) { - if (tsdbAllocBuf(&((void *)(pReadH->pBlockIdx), sizeof(SBlockIdx) * (pReadH->nBlockIdx + 1))) < 0) { + if (tsdbAllocBuf(&((void *)(pReadH->pBlockIdx)), sizeof(SBlockIdx) * (pReadH->nBlockIdx + 1)) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -391,7 +509,7 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { pBuf = tsdbDecodeBlockIdx(pBuf, &(pReadH->pBlockIdx[pReadH->nBlockIdx])); if (pBuf == NULL) { tsdbError("vgId:%d failed to decode block idx part from file %s", REPO_ID(pRepo), pFile->fname); - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -418,14 +536,14 @@ static int tsdbAllocBuf(void **ppBuf, int size) { static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx) { if (!taosCheckChecksumWhole((uint8_t *)pBlockInfo, pBlockIdx->len)) return -1; if (pBlockInfo->delimiter != TSDB_FILE_DELIMITER || pBlockInfo->uid != pBlockIdx->uid || - pBlockInfo->tid != pBlockIdx->tid) + pBlockInfo->tid != pBlockIdx->tid) { return -1; + } return 0; } -static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, - int maxPoints, char *buffer, int bufferSize) { - // Verify by checksum +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, + int maxPoints, char *buffer, int bsize) { if (!taosCheckChecksumWhole((uint8_t *)content, len)) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; @@ -435,16 +553,18 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 if (comp) { // // Need to decompress int tlen = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, - pDataCol->spaceSize, comp, buffer, bufferSize); + pDataCol->spaceSize, comp, buffer, bsize); if (tlen <= 0) { - tsdbError("Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d", - len, comp, numOfRows, maxPoints, bufferSize); + tsdbError("failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bsize:%d", len, + comp, numOfRows, maxPoints, bsize); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } pDataCol->len = tlen; if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { dataColSetOffset(pDataCol, numOfRows); + } else { + ASSERT(pDataCol->len == pDataCol->bytes * numOfRows); } } else { // No need to decompress, just memcpy it @@ -452,46 +572,55 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 memcpy(pDataCol->pData, content, pDataCol->len); if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { dataColSetOffset(pDataCol, numOfRows); + } else { + ASSERT(pDataCol->len == pDataCol->bytes * numOfRows); } } return 0; } -static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol, +static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) { - ASSERT(pDataCol->colId == pCompCol->colId); - int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES; - pHelper->pBuffer = taosTRealloc(pHelper->pBuffer, pCompCol->len); - if (pHelper->pBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } + ASSERT(pDataCol->colId == pBlockCol->colId); - pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tsize); - if (pHelper->compBuffer == NULL) { + STsdbRepo *pRepo = pReadH->pRepo; + STsdbCfg * pCfg = &(pRepo->config); + + if (tsdbAllocBuf(&(pReadH->pBuf), pBlockCol->len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } - int64_t offset = pCompBlock->offset + TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols) + pCompCol->offset; + int64_t offset = pBlock->offset + TSDB_GET_COMPCOL_LEN(pBlock->numOfCols) + pBlockCol->offset; if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (taosTRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) { - tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname, - strerror(errno)); + int ret = taosTRead(pFile->fd, pReadH->pBuf, pBlockCol->len); + if (ret < 0) { + tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), pBlockCol->len, + pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm, - pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock, - pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)) < 0) { - tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, - pCompCol->colId, offset); + if (pBlock->algorithm == TWO_STAGE_COMP) { // extend compression buffer + int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + zsize += (sizeof(VarDataLenT) * pBlock->numOfRows); + } + + if (tsdbAllocBuf(&(pReadH->pCBuf), zsize) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + if (tsdbCheckAndDecodeColumnData(pDataCol, pReadH->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows, + pCfg->maxRowsPerFileBlock, pReadH->pCBuf, (int32_t)taosTSizeof(pReadH->pCBuf)) < 0) { + tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), pFile->fname, + pBlockCol->colId, offset); return -1; } diff --git a/src/tsdb/src/tsdbUtil.c b/src/tsdb/src/tsdbUtil.c index 6ea28e580e2db677686272534669971221a709e1..18a23284df2e23d01026f6af21d80226290c535f 100644 --- a/src/tsdb/src/tsdbUtil.c +++ b/src/tsdb/src/tsdbUtil.c @@ -104,4 +104,10 @@ void *tsdbDecodeBlockIdx(void *buf, SBlockIdx *pBlockIdx) { pBlockIdx->maxKey = (TSKEY)maxKey; return buf; +} + +void tsdbResetFGroupFd(SFileGroup *pFGroup) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + pFGroup->files[type].fd = -1; + } } \ No newline at end of file