提交 86718347 编写于 作者: H Hongze Cheng

finish refactor tsdb read code

上级 1747e8ff
...@@ -332,9 +332,11 @@ typedef struct { ...@@ -332,9 +332,11 @@ typedef struct {
TSKEY maxKey; TSKEY maxKey;
SBlockIdx* pBlockIdx; SBlockIdx* pBlockIdx;
int nBlockIdx; int nBlockIdx;
int cBlockIdx;
SBlockIdx* pCurBlockIdx; SBlockIdx* pCurBlockIdx;
STable* pTable; STable* pTable;
SBlockInfo* pBlockInfo; SBlockInfo* pBlockInfo;
SBlockData* pBlockData;
SDataCols* pDataCols[2]; SDataCols* pDataCols[2];
void* pBuf; void* pBuf;
void* pCBuf; void* pCBuf;
...@@ -592,6 +594,9 @@ typedef struct { ...@@ -592,6 +594,9 @@ typedef struct {
void* pBuffer; void* pBuffer;
SList* pModLog; SList* pModLog;
} SCommitHandle; } SCommitHandle;
void tsdbResetFGroupFd(SFileGroup* pFGroup);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -21,27 +21,57 @@ ...@@ -21,27 +21,57 @@
#include "tscompression.h" #include "tscompression.h"
#include "tsdbMain.h" #include "tsdbMain.h"
int tsdbInitReadHandle(SReadHandle *pReadH, STsdbRepo *pRepo) { #define TSDB_KEY_COL_OFFSET 0
SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) {
SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH));
if (pReadH == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
pReadH->pRepo = pRepo; pReadH->pRepo = pRepo;
return 0;
STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &(pRepo->config);
if ((pReadH->pDataCols[0] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL ||
(pReadH->pDataCols[1] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeReadHandle(pReadH);
return NULL;
}
tsdbResetFGroupFd(&(pReadH->fGroup));
return pReadH;
} }
void tsdbDestroyReadHandle(SReadHandle *pReadH) { void tsdbFreeReadHandle(SReadHandle *pReadH) {
// TODO if (pReadH) {
taosTZfree(pReadH->pBlockIdx);
taosTZfree(pReadH->pBlockInfo);
taosTZfree(pReadH->pBlockData);
tdFreeDataCols(pReadH->pDataCols[0]);
tdFreeDataCols(pReadH->pDataCols[1]);
taosTZfree(pReadH->pBuf);
taosTZfree(pReadH->pCBuf);
free(pReadH);
}
} }
int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
STsdbRepo *pRepo = pReadH->pRepo; STsdbRepo *pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config);
pReadH->fGroup = *pFGroup; pReadH->fGroup = *pFGroup;
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { tsdbResetFGroupFd(&(pReadH->fGroup));
pReadH->fGroup.files[type].fd = -1;
}
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = &(pReadH->fGroup.files[type]); SFile *pFile = TSDB_READ_FILE(pReadH, type);
if (pFile->fname[0] != '\0') { if (pFile->fname[0] != '\0') { // pFile->fname[0] == '\0' is for commit usage
pFile->fd = open(pFile->fname, O_RDONLY); pFile->fd = open(pFile->fname, O_RDONLY);
if (pFile->fd < 0) { if (pFile->fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno));
...@@ -52,14 +82,16 @@ int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { ...@@ -52,14 +82,16 @@ int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
} }
} }
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pFGroup->fileId, &(pReadH->minKey), &(pReadH->maxKey));
return 0; return 0;
} }
void tsdbCloseAndUnsetFile(SReadHandle *pReadH) { void tsdbCloseAndUnsetFile(SReadHandle *pReadH) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = &(pReadH->fGroup.files[type]); SFile *pFile = TSDB_READ_FILE(pReadH, type);
if (pFile->fd > 0) { if (pFile->fd >= 0) {
(void)close(pFile->fd); (void)close(pFile->fd);
pFile->fd = -1; pFile->fd = -1;
} }
...@@ -68,20 +100,22 @@ void tsdbCloseAndUnsetFile(SReadHandle *pReadH) { ...@@ -68,20 +100,22 @@ void tsdbCloseAndUnsetFile(SReadHandle *pReadH) {
int tsdbLoadBlockIdx(SReadHandle *pReadH) { int tsdbLoadBlockIdx(SReadHandle *pReadH) {
STsdbRepo *pRepo = pReadH->pRepo; STsdbRepo *pRepo = pReadH->pRepo;
SFile * pFile = &(pReadH->fGroup.files[TSDB_FILE_TYPE_HEAD]); SFile * pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD);
if (pFile->fd < 0 || pFile->info.len == 0) { if (pFile->fd < 0 || pFile->info.len == 0) {
pReadH->nBlockIdx = 0; pReadH->nBlockIdx = 0;
pReadH->pCurBlockIdx = NULL;
return 0; return 0;
} }
if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) { if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
if (lseek(pFile->fd, pFile->info.offset, SEEK_SET) < 0) { if (lseek(pFile->fd, pFile->info.offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); tsdbError("vgId:%d failed to lseek file %s to offset %u since %s", REPO_ID(pRepo), pFile->fname, pFile->info.offset,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -94,7 +128,7 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) { ...@@ -94,7 +128,7 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) {
return -1; return -1;
} }
if (ret < pFile->info.len || !taosCheckChecksumWhole((uint8_t *)pReadH->pBuf, pFile->info.len)) { if (ret < pFile->info.len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), pFile->info.len)) {
tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u", REPO_ID(pRepo), pFile->fname, tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u", REPO_ID(pRepo), pFile->fname,
pFile->info.offset, pFile->info.len); pFile->info.offset, pFile->info.len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
...@@ -107,13 +141,53 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) { ...@@ -107,13 +141,53 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) {
return -1; return -1;
} }
pReadH->cBlockIdx = 0;
return 0; return 0;
} }
int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) { int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) {
ASSERT(pTable != NULL);
pReadH->pTable = pTable; pReadH->pTable = pTable;
// TODO
// pReadH->pCurBlockIdx = NULL; STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
ASSERT(pSchema != NULL);
if (tdInitDataCols(pReadH->pDataCols[0], pSchema) < 0 || tdInitDataCols(pReadH->pDataCols[1], pSchema) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (pReadH->nBlockIdx > 0) {
ASSERT(pReadH->cBlockIdx <= pReadH->nBlockIdx);
while (true) {
if (pReadH->cBlockIdx >= pReadH->nBlockIdx) {
pReadH->pCurBlockIdx = NULL;
break;
}
SBlockIdx *pBlockIdx = pReadH->pBlockIdx + pReadH->cBlockIdx;
if (pBlockIdx->tid == TABLE_TID(pTable)) {
if (pBlockIdx->uid == TABLE_UID(pTable)) {
pReadH->pCurBlockIdx = pBlockIdx;
} else {
pReadH->pCurBlockIdx = NULL;
}
pReadH->cBlockIdx++;
break;
} else if (pBlockIdx->tid < TABLE_TID(pTable)) {
pReadH->cBlockIdx++;
} else {
pReadH->pCurBlockIdx = NULL;
break;
}
}
} else {
pReadH->pCurBlockIdx = NULL;
}
return 0; return 0;
} }
...@@ -132,12 +206,13 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { ...@@ -132,12 +206,13 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) {
} }
if (lseek(pFile->fd, pBlockIdx->offset, SEEK_SET) < 0) { if (lseek(pFile->fd, pBlockIdx->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); tsdbError("vgId:%d failed to lseek file %s to offset %u since %s", REPO_ID(pRepo), pFile->fname, pBlockIdx->offset,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ssize_t ret = taosTRead(pFile->fd, (void *)pReadH->pBlockInfo, pBlockIdx->len); ssize_t ret = taosTRead(pFile->fd, (void *)(pReadH->pBlockInfo), pBlockIdx->len);
if (ret < 0) { if (ret < 0) {
tsdbError("vgId:%d failed to read block info part of table %s from file %s since %s", REPO_ID(pRepo), tsdbError("vgId:%d failed to read block info part of table %s from file %s since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pReadH->pTable), pFile->fname, strerror(errno)); TABLE_CHAR_NAME(pReadH->pTable), pFile->fname, strerror(errno));
...@@ -146,8 +221,8 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { ...@@ -146,8 +221,8 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) {
} }
if (ret < pBlockIdx->len || tsdbVerifyBlockInfo(pReadH->pBlockInfo, pBlockIdx) < 0) { if (ret < pBlockIdx->len || tsdbVerifyBlockInfo(pReadH->pBlockInfo, pBlockIdx) < 0) {
tsdbError("vgId:%d table %s block info part is corrupted from file %s since %s", REPO_ID(pRepo), tsdbError("vgId:%d table %s block info part is corrupted from file %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pReadH->pTable), pFile->fname, strerror(errno)); TABLE_CHAR_NAME(pReadH->pTable), pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -198,6 +273,49 @@ int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBloc ...@@ -198,6 +273,49 @@ int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBloc
if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) goto _err; if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) goto _err;
} }
ASSERT(pReadH->pDataCols[0]->numOfRows == pBlock->numOfRows);
ASSERT(dataColsKeyFirst(pReadH->pDataCols[0]) == pBlock->keyFirst);
ASSERT(dataColsKeyLast(pReadH->pDataCols[0]) == pBlock->keyLast);
return 0;
}
int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
STsdbRepo *pRepo = pReadH->pRepo;
SFile * pFile =
(pBlock->last) ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA);
if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname,
pBlock->offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols);
if (tsdbAllocBuf(&(pReadH->pBlockData), tsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
int ret = taosTRead(pFile->fd, tsize);
if (ret < 0) {
tsdbError("vgId:%d failed to read block data info part from file %s offset %" PRId64 " len %d since %s",
REPO_ID(pRepo), pFile->fname, pBlock->offset, tsize, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret < tsize || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBlockData), tsize)) {
tsdbError("vgId:%d block data info part is corrupted in file %s offset %" PRId64 " len %d", REPO_ID(pRepo),
pFile->fname, pBlock->offset, tsize);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
ASSERT(pReadH->pBlockData->numOfCols == pBlock->numOfCols);
return 0; return 0;
} }
...@@ -214,15 +332,16 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ...@@ -214,15 +332,16 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
} }
if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) { if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname,
pBlock->offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
int ret = taosTRead(pFile->fd, (void *)pReadH->pBuf, pBlock->len); int ret = taosTRead(pFile->fd, (void *)(pReadH->pBuf), pBlock->len);
if (ret < 0) { if (ret < 0) {
tsdbError("vgId:%d failed to read block data part from file %s since %s", REPO_ID(pRepo), pFile->fname, tsdbError("vgId:%d failed to read block data part from file %s at offset %" PRId64 " len %d since %s",
strerror(errno)); REPO_ID(pRepo), pFile->fname, pBlock->offset, pBlock->len, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -240,6 +359,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ...@@ -240,6 +359,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
ASSERT(pBlockData->numOfCols = pBlock->numOfCols); ASSERT(pBlockData->numOfCols = pBlock->numOfCols);
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
ASSERT(pBlock->numOfRows <= pDataCols->maxPoints);
pDataCols->numOfRows = pBlock->numOfRows; pDataCols->numOfRows = pBlock->numOfRows;
int ccol = 0; int ccol = 0;
...@@ -269,7 +389,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ...@@ -269,7 +389,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
if (pBlock->algorithm == TWO_STAGE_COMP) { // extend compression buffer if (pBlock->algorithm == TWO_STAGE_COMP) { // extend compression buffer
int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES;
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows); zsize += (sizeof(VarDataLenT) * pBlock->numOfRows);
} }
if (tsdbAllocBuf(&(pReadH->pCBuf), zsize) < 0) { if (tsdbAllocBuf(&(pReadH->pCBuf), zsize) < 0) {
...@@ -280,7 +400,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ...@@ -280,7 +400,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm, if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm,
pBlock->numOfRows, pDataCols->maxPoints, pReadH->pCBuf, pBlock->numOfRows, pDataCols->maxPoints, pReadH->pCBuf,
(int32_t)taosTSizeof(pReadH->pCBuf)) < 0) { (int32_t)taosTSizeof(pReadH->pCBuf)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d", REPO_ID(pRepo), tsdbError("vgId:%d file %s is corrupted at column %d block offset %" PRId64 " column offset %d", REPO_ID(pRepo),
pFile->fname, tcolId, (int64_t)pBlock->offset, toffset); pFile->fname, tcolId, (int64_t)pBlock->offset, toffset);
return -1; return -1;
} }
...@@ -305,26 +425,27 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC ...@@ -305,26 +425,27 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC
SFile *pFile = SFile *pFile =
pBlock->last ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); pBlock->last ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA);
SBlockCol compCol = {0}; SBlockCol blockCol = {0};
// If only load timestamp column, no need to load SBlockData part // If only load timestamp column, no need to load SBlockData part
if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err; if (numOfColIds > 1 && tsdbLoadBlockDataInfo(pReadH, pBlock) < 0) return -1;
pDataCols->numOfRows = pCompBlock->numOfRows; tdResetDataCols(pDataCols);
pDataCols->numOfRows = pBlock->numOfRows;
int dcol = 0; int dcol = 0;
int ccol = 0; int ccol = 0;
for (int i = 0; i < numOfColIds; i++) { for (int i = 0; i < numOfColIds; i++) {
int16_t colId = colIds[i]; int16_t colId = colIds[i];
SDataCol *pDataCol = NULL; SDataCol * pDataCol = NULL;
SBlockCol *pCompCol = NULL; SBlockCol *pBlockCol = NULL;
while (true) { while (true) {
if (dcol >= pDataCols->numOfCols) { if (dcol >= pDataCols->numOfCols) {
pDataCol = NULL; pDataCol = NULL;
break; break;
} }
pDataCol = &pDataCols->cols[dcol]; pDataCol = &(pDataCols->cols[dcol]);
if (pDataCol->colId > colId) { if (pDataCol->colId > colId) {
pDataCol = NULL; pDataCol = NULL;
break; break;
...@@ -338,52 +459,49 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC ...@@ -338,52 +459,49 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC
ASSERT(pDataCol->colId == colId); ASSERT(pDataCol->colId == colId);
if (colId == 0) { // load the key row if (colId == 0) { // load the key row
compCol.colId = colId; blockCol.colId = colId;
compCol.len = pCompBlock->keyLen; blockCol.len = pBlock->keyLen;
compCol.type = pDataCol->type; blockCol.type = pDataCol->type;
compCol.offset = TSDB_KEY_COL_OFFSET; blockCol.offset = TSDB_KEY_COL_OFFSET;
pCompCol = &compCol; pBlockCol = &blockCol;
} else { // load non-key rows } else { // load non-key rows
while (true) { while (true) {
if (ccol >= pCompBlock->numOfCols) { if (ccol >= pBlock->numOfCols) {
pCompCol = NULL; pBlockCol = NULL;
break; break;
} }
pCompCol = &(pHelper->pCompData->cols[ccol]); pBlockCol = &(pReadH->pBlockData->cols[ccol]);
if (pCompCol->colId > colId) { if (pBlockCol->colId > colId) {
pCompCol = NULL; pBlockCol = NULL;
break; break;
} else { } else {
ccol++; ccol++;
if (pCompCol->colId == colId) break; if (pBlockCol->colId == colId) break;
} }
} }
if (pCompCol == NULL) { if (pBlockCol == NULL) {
dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints);
continue; continue;
} }
ASSERT(pCompCol->colId == pDataCol->colId); ASSERT(pBlockCol->colId == pDataCol->colId);
} }
if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err; if (tsdbLoadColData(pReadH, pFile, pBlock, pBlockCol, pDataCol) < 0) return -1;
} }
return 0; return 0;
_err:
return -1;
} }
static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
void *pBuf = pReadH->pBuf; void * pBuf = pReadH->pBuf;
SFile *pFile = &(pReadH->fGroup.files[TSDB_FILE_TYPE_HEAD]); SFile *pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD);
pReadH->nBlockIdx = 0; pReadH->nBlockIdx = 0;
while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) { while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) {
if (tsdbAllocBuf(&((void *)(pReadH->pBlockIdx), sizeof(SBlockIdx) * (pReadH->nBlockIdx + 1))) < 0) { if (tsdbAllocBuf(&((void *)(pReadH->pBlockIdx)), sizeof(SBlockIdx) * (pReadH->nBlockIdx + 1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -391,7 +509,7 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { ...@@ -391,7 +509,7 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
pBuf = tsdbDecodeBlockIdx(pBuf, &(pReadH->pBlockIdx[pReadH->nBlockIdx])); pBuf = tsdbDecodeBlockIdx(pBuf, &(pReadH->pBlockIdx[pReadH->nBlockIdx]));
if (pBuf == NULL) { if (pBuf == NULL) {
tsdbError("vgId:%d failed to decode block idx part from file %s", REPO_ID(pRepo), pFile->fname); tsdbError("vgId:%d failed to decode block idx part from file %s", REPO_ID(pRepo), pFile->fname);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -418,14 +536,14 @@ static int tsdbAllocBuf(void **ppBuf, int size) { ...@@ -418,14 +536,14 @@ static int tsdbAllocBuf(void **ppBuf, int size) {
static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx) { static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx) {
if (!taosCheckChecksumWhole((uint8_t *)pBlockInfo, pBlockIdx->len)) return -1; if (!taosCheckChecksumWhole((uint8_t *)pBlockInfo, pBlockIdx->len)) return -1;
if (pBlockInfo->delimiter != TSDB_FILE_DELIMITER || pBlockInfo->uid != pBlockIdx->uid || if (pBlockInfo->delimiter != TSDB_FILE_DELIMITER || pBlockInfo->uid != pBlockIdx->uid ||
pBlockInfo->tid != pBlockIdx->tid) pBlockInfo->tid != pBlockIdx->tid) {
return -1; return -1;
}
return 0; return 0;
} }
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize) { int maxPoints, char *buffer, int bsize) {
// Verify by checksum
if (!taosCheckChecksumWhole((uint8_t *)content, len)) { if (!taosCheckChecksumWhole((uint8_t *)content, len)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return -1;
...@@ -435,16 +553,18 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 ...@@ -435,16 +553,18 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
if (comp) { if (comp) {
// // Need to decompress // // Need to decompress
int tlen = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, int tlen = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData,
pDataCol->spaceSize, comp, buffer, bufferSize); pDataCol->spaceSize, comp, buffer, bsize);
if (tlen <= 0) { if (tlen <= 0) {
tsdbError("Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d", tsdbError("failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bsize:%d", len,
len, comp, numOfRows, maxPoints, bufferSize); comp, numOfRows, maxPoints, bsize);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return -1;
} }
pDataCol->len = tlen; pDataCol->len = tlen;
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
dataColSetOffset(pDataCol, numOfRows); dataColSetOffset(pDataCol, numOfRows);
} else {
ASSERT(pDataCol->len == pDataCol->bytes * numOfRows);
} }
} else { } else {
// No need to decompress, just memcpy it // No need to decompress, just memcpy it
...@@ -452,46 +572,55 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 ...@@ -452,46 +572,55 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
memcpy(pDataCol->pData, content, pDataCol->len); memcpy(pDataCol->pData, content, pDataCol->len);
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
dataColSetOffset(pDataCol, numOfRows); dataColSetOffset(pDataCol, numOfRows);
} else {
ASSERT(pDataCol->len == pDataCol->bytes * numOfRows);
} }
} }
return 0; return 0;
} }
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol, static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol,
SDataCol *pDataCol) { SDataCol *pDataCol) {
ASSERT(pDataCol->colId == pCompCol->colId); ASSERT(pDataCol->colId == pBlockCol->colId);
int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
pHelper->pBuffer = taosTRealloc(pHelper->pBuffer, pCompCol->len); STsdbRepo *pRepo = pReadH->pRepo;
if (pHelper->pBuffer == NULL) { STsdbCfg * pCfg = &(pRepo->config);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tsize); if (tsdbAllocBuf(&(pReadH->pBuf), pBlockCol->len) < 0) {
if (pHelper->compBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
int64_t offset = pCompBlock->offset + TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols) + pCompCol->offset; int64_t offset = pBlock->offset + TSDB_GET_COMPCOL_LEN(pBlock->numOfCols) + pBlockCol->offset;
if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) { if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (taosTRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) { int ret = taosTRead(pFile->fd, pReadH->pBuf, pBlockCol->len);
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname, if (ret < 0) {
strerror(errno)); tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), pBlockCol->len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm, if (pBlock->algorithm == TWO_STAGE_COMP) { // extend compression buffer
pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock, int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES;
pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)) < 0) { if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, zsize += (sizeof(VarDataLenT) * pBlock->numOfRows);
pCompCol->colId, offset); }
if (tsdbAllocBuf(&(pReadH->pCBuf), zsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
if (tsdbCheckAndDecodeColumnData(pDataCol, pReadH->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows,
pCfg->maxRowsPerFileBlock, pReadH->pCBuf, (int32_t)taosTSizeof(pReadH->pCBuf)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), pFile->fname,
pBlockCol->colId, offset);
return -1; return -1;
} }
......
...@@ -105,3 +105,9 @@ void *tsdbDecodeBlockIdx(void *buf, SBlockIdx *pBlockIdx) { ...@@ -105,3 +105,9 @@ void *tsdbDecodeBlockIdx(void *buf, SBlockIdx *pBlockIdx) {
return buf; return buf;
} }
void tsdbResetFGroupFd(SFileGroup *pFGroup) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
pFGroup->files[type].fd = -1;
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册