提交 1adfaef8 编写于 作者: H Hongze Cheng

refactor more code

上级 464df615
...@@ -272,48 +272,6 @@ typedef struct { ...@@ -272,48 +272,6 @@ typedef struct {
SBlockCol cols[]; SBlockCol cols[];
} SBlockData; } SBlockData;
// typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
// typedef struct {
// TSKEY minKey;
// TSKEY maxKey;
// SFileGroup fGroup;
// SFile nHeadF;
// SFile nLastF;
// } SHelperFile;
// typedef struct {
// uint64_t uid;
// int32_t tid;
// } SHelperTable;
// typedef struct {
// SBlockIdx* pIdxArray;
// int numOfIdx;
// int curIdx;
// } SIdxH;
// typedef struct {
// tsdb_rw_helper_t type;
// STsdbRepo* pRepo;
// int8_t state;
// // For file set usage
// SHelperFile files;
// SIdxH idxH;
// SBlockIdx curCompIdx;
// void* pWIdx;
// // For table set usage
// SHelperTable tableInfo;
// SBlockInfo* pCompInfo;
// bool hasOldLastBlock;
// // For block set usage
// SBlockData* pCompData;
// SDataCols* pDataCols[2];
// void* pBuffer; // Buffer to hold the whole data block
// void* compBuffer; // Buffer for temperary compress/decompress purpose
// } SRWHelper;
// ------------------ tsdbScan.c // ------------------ tsdbScan.c
typedef struct { typedef struct {
SFileGroup fGroup; SFileGroup fGroup;
...@@ -342,7 +300,6 @@ typedef struct { ...@@ -342,7 +300,6 @@ typedef struct {
void* pCBuf; void* pCBuf;
} SReadHandle; } SReadHandle;
#define TSDB_READ_FILE(pReadH, type) (&((pReadH)->fGroup.files[(type)]))
#define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM)) #define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM))
// Operations // Operations
...@@ -506,62 +463,6 @@ int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); ...@@ -506,62 +463,6 @@ int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// ------------------ tsdbRWHelper.c
// #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
// #define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
// #define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded
// #define TSDB_HELPER_TABLE_SET 0x4 // Table is set
// #define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
// #define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
// #define helperSetState(h, s) (((h)->state) |= (s))
// #define helperClearState(h, s) ((h)->state &= (~(s)))
// #define helperHasState(h, s) ((((h)->state) & (s)) == (s))
// #define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
// #define TSDB_MAX_SUBBLOCKS 8
// #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
// #define helperType(h) (h)->type
// #define helperRepo(h) (h)->pRepo
// #define helperState(h) (h)->state
// #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
// #define helperFileId(h) ((h)->files.fGroup.fileId)
// #define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
// #define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
// #define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
// #define helperNewHeadF(h) (&((h)->files.nHeadF))
// #define helperNewLastF(h) (&((h)->files.nLastF))
// int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
// int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
// void tsdbDestroyHelper(SRWHelper* pHelper);
// void tsdbResetHelper(SRWHelper* pHelper);
// int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
// int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup);
// int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
// int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
// int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
// int tsdbWriteCompInfo(SRWHelper* pHelper);
// int tsdbWriteCompIdx(SRWHelper* pHelper);
// int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer);
// int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx);
// int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
// int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo);
// int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
// int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target);
// void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
// int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds,
// int numOfColIds);
// int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo);
// static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
// if (*(TSKEY*)key1 > *(TSKEY*)key2) {
// return 1;
// } else if (*(TSKEY*)key1 == *(TSKEY*)key2) {
// return 0;
// } else {
// return -1;
// }
// }
// ------------------ tsdbMain.c // ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId #define REPO_ID(r) (r)->config.tsdbId
#define IS_REPO_LOCKED(r) (r)->repoLocked #define IS_REPO_LOCKED(r) (r)->repoLocked
...@@ -609,7 +510,7 @@ int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock); ...@@ -609,7 +510,7 @@ int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock);
#define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)])) #define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)]))
int tsdbAllocBuf(void** ppBuf, int size); int tsdbAllocBuf(void **ppBuf, uint32_t size);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
...@@ -23,6 +24,15 @@ ...@@ -23,6 +24,15 @@
#define TSDB_KEY_COL_OFFSET 0 #define TSDB_KEY_COL_OFFSET 0
static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols);
static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds);
static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH);
static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bsize);
static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) { SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) {
SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH)); SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH));
if (pReadH == NULL) { if (pReadH == NULL) {
...@@ -35,6 +45,7 @@ SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) { ...@@ -35,6 +45,7 @@ SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
// TODO: make the memory allocation on demand
if ((pReadH->pDataCols[0] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL || if ((pReadH->pDataCols[0] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL ||
(pReadH->pDataCols[1] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { (pReadH->pDataCols[1] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
...@@ -61,6 +72,8 @@ void tsdbFreeReadHandle(SReadHandle *pReadH) { ...@@ -61,6 +72,8 @@ void tsdbFreeReadHandle(SReadHandle *pReadH) {
} }
int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
ASSERT(pReadH != NULL && pFGroup != NULL);
STsdbRepo *pRepo = pReadH->pRepo; STsdbRepo *pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
...@@ -69,7 +82,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { ...@@ -69,7 +82,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
tsdbResetFGroupFd(&(pReadH->fGroup)); tsdbResetFGroupFd(&(pReadH->fGroup));
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = TSDB_READ_FILE(pReadH, type); SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), type);
if (pFile->fname[0] != '\0') { // pFile->fname[0] == '\0' is for commit usage if (pFile->fname[0] != '\0') { // pFile->fname[0] == '\0' is for commit usage
pFile->fd = open(pFile->fname, O_RDONLY); pFile->fd = open(pFile->fname, O_RDONLY);
...@@ -89,7 +102,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { ...@@ -89,7 +102,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) { void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = TSDB_READ_FILE(pReadH, type); SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), type);
if (pFile->fd >= 0) { if (pFile->fd >= 0) {
(void)close(pFile->fd); (void)close(pFile->fd);
...@@ -99,15 +112,19 @@ void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) { ...@@ -99,15 +112,19 @@ void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) {
} }
int tsdbLoadBlockIdx(SReadHandle *pReadH) { int tsdbLoadBlockIdx(SReadHandle *pReadH) {
ASSERT(pReadH != NULL);
STsdbRepo *pRepo = pReadH->pRepo; STsdbRepo *pRepo = pReadH->pRepo;
SFile * pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD); SFile * pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD);
if (pFile->fd < 0 || pFile->info.len == 0) { if (pFile->fd < 0 || pFile->info.len == 0) { // for backward compatibility
pReadH->nBlockIdx = 0; pReadH->nBlockIdx = 0;
pReadH->pCurBlockIdx = NULL; pReadH->pCurBlockIdx = NULL;
return 0; return 0;
} }
ASSERT(pFile->info.size == pFile->info.offset + pFile->info.len);
if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) { if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
...@@ -129,15 +146,15 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) { ...@@ -129,15 +146,15 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) {
} }
if (ret < pFile->info.len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), pFile->info.len)) { if (ret < pFile->info.len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), pFile->info.len)) {
tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u", REPO_ID(pRepo), pFile->fname, tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u file size %" PRIu64, REPO_ID(pRepo),
pFile->info.offset, pFile->info.len); pFile->fname, pFile->info.offset, pFile->info.len, pFile->info.size);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return -1;
} }
if (tsdbDecodeBlockIdxArray(pReadH) < 0) { if (tsdbDecodeBlockIdxArray(pReadH) < 0) {
tsdbError("vgId:%d error occurs while decoding block idx part from file %s", REPO_ID(pRepo), pFile->fname); tsdbError("vgId:%d error occurs while decoding block idx part from file %s since %s", REPO_ID(pRepo), pFile->fname,
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; tstrerror(terrno));
return -1; return -1;
} }
...@@ -147,7 +164,7 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) { ...@@ -147,7 +164,7 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) {
} }
int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) { int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) {
ASSERT(pTable != NULL); ASSERT(pReadH != NULL && pTable != NULL);
pReadH->pTable = pTable; pReadH->pTable = pTable;
...@@ -162,6 +179,7 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) { ...@@ -162,6 +179,7 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) {
if (pReadH->nBlockIdx > 0) { if (pReadH->nBlockIdx > 0) {
ASSERT(pReadH->cBlockIdx <= pReadH->nBlockIdx); ASSERT(pReadH->cBlockIdx <= pReadH->nBlockIdx);
// linear search TABLE_TID(pTable)
while (true) { while (true) {
if (pReadH->cBlockIdx >= pReadH->nBlockIdx) { if (pReadH->cBlockIdx >= pReadH->nBlockIdx) {
pReadH->pCurBlockIdx = NULL; pReadH->pCurBlockIdx = NULL;
...@@ -192,15 +210,17 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) { ...@@ -192,15 +210,17 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) {
} }
int tsdbLoadBlockInfo(SReadHandle *pReadH) { int tsdbLoadBlockInfo(SReadHandle *pReadH) {
ASSERT(pReadH != NULL);
if (pReadH->pCurBlockIdx == NULL) return 0; if (pReadH->pCurBlockIdx == NULL) return 0;
SFile * pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD); SFile * pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD);
SBlockIdx *pBlockIdx = pReadH->pCurBlockIdx; SBlockIdx *pBlockIdx = pReadH->pCurBlockIdx;
STsdbRepo *pRepo = pReadH->pRepo; STsdbRepo *pRepo = pReadH->pRepo;
ASSERT(pFile->fd > 0); ASSERT(pFile->fd > 0 && pBlockIdx->len > 0);
if (tsdbAllocBuf(&((void *)pReadH->pBlockInfo), pBlockIdx->len) < 0) { if (tsdbAllocBuf(&((void *)(pReadH->pBlockInfo)), pBlockIdx->len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -221,7 +241,7 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { ...@@ -221,7 +241,7 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) {
} }
if (ret < pBlockIdx->len || tsdbVerifyBlockInfo(pReadH->pBlockInfo, pBlockIdx) < 0) { if (ret < pBlockIdx->len || tsdbVerifyBlockInfo(pReadH->pBlockInfo, pBlockIdx) < 0) {
tsdbError("vgId:%d table %s block info part is corrupted from file %s", REPO_ID(pRepo), tsdbError("vgId:%d table %s block info part is corrupted in file %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pReadH->pTable), pFile->fname); TABLE_CHAR_NAME(pReadH->pTable), pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return -1;
...@@ -270,7 +290,7 @@ int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBloc ...@@ -270,7 +290,7 @@ int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBloc
for (int i = 1; i < nSubBlock; i++) { for (int i = 1; i < nSubBlock; i++) {
pSubBlock++; pSubBlock++;
if (tsdbLoadBlockDataColsImpl(pReadH, pSubBlock, pReadH->pDataCols[1], colIds, numOfCols) < 0) return -1; if (tsdbLoadBlockDataColsImpl(pReadH, pSubBlock, pReadH->pDataCols[1], colIds, numOfCols) < 0) return -1;
if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) goto _err; if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) return -1;
} }
ASSERT(pReadH->pDataCols[0]->numOfRows == pBlock->numOfRows); ASSERT(pReadH->pDataCols[0]->numOfRows == pBlock->numOfRows);
...@@ -284,8 +304,12 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) { ...@@ -284,8 +304,12 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1); ASSERT(pBlock->numOfSubBlocks <= 1);
STsdbRepo *pRepo = pReadH->pRepo; STsdbRepo *pRepo = pReadH->pRepo;
SFile * pFile = SFile * pFile = NULL;
(pBlock->last) ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); if (pBlock->last) {
pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
} else {
pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA);
}
if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) { if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname, tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname,
...@@ -316,6 +340,9 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) { ...@@ -316,6 +340,9 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) {
} }
ASSERT(pReadH->pBlockData->numOfCols == pBlock->numOfCols); ASSERT(pReadH->pBlockData->numOfCols == pBlock->numOfCols);
ASSERT(pReadH->pBlockData->delimiter == TSDB_FILE_DELIMITER);
ASSERT(pReadH->pBlockData->numOfCols == pBlock->numOfCols);
return 0; return 0;
} }
...@@ -323,8 +350,12 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ...@@ -323,8 +350,12 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
ASSERT(pBlock->numOfSubBlocks <= 1); ASSERT(pBlock->numOfSubBlocks <= 1);
STsdbRepo *pRepo = pReadH->pRepo; STsdbRepo *pRepo = pReadH->pRepo;
SFile *pFile = SFile * pFile = NULL;
(pBlock->last) ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); if (pBlock->last) {
pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
} else {
pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA);
}
if (tsdbAllocBuf(&(pReadH->pBuf), pBlock->len) < 0) { if (tsdbAllocBuf(&(pReadH->pBuf), pBlock->len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
...@@ -347,8 +378,9 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ...@@ -347,8 +378,9 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
} }
int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols); int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols);
if (ret < pBlock->len || !taosCheckChecksumWhole((uint8_t *)pReadH->pBuf, tsize)) { if (ret < pBlock->len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), tsize)) {
tsdbError("vgId:%d block data part from file %s is corrupted", REPO_ID(pRepo), pFile->fname); tsdbError("vgId:%d block data part from file %s at offset %" PRId64 " len %d is corrupted", REPO_ID(pRepo),
pFile->fname, pBlock->offset, pBlock->len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -362,8 +394,8 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ...@@ -362,8 +394,8 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
ASSERT(pBlock->numOfRows <= pDataCols->maxPoints); ASSERT(pBlock->numOfRows <= pDataCols->maxPoints);
pDataCols->numOfRows = pBlock->numOfRows; pDataCols->numOfRows = pBlock->numOfRows;
int ccol = 0; int ccol = 0; // loop iter over SBlockCols
int dcol = 0; int dcol = 0; // loop iter over pDataCols
while (dcol < pDataCols->numOfCols) { while (dcol < pDataCols->numOfCols) {
SDataCol *pDataCol = &(pDataCols->cols[dcol]); SDataCol *pDataCol = &(pDataCols->cols[dcol]);
if (dcol != 0 && ccol >= pBlockData->numOfCols) { if (dcol != 0 && ccol >= pBlockData->numOfCols) {
...@@ -422,24 +454,32 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC ...@@ -422,24 +454,32 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC
int numOfColIds) { int numOfColIds) {
ASSERT(pBlock->numOfSubBlocks <= 1); ASSERT(pBlock->numOfSubBlocks <= 1);
ASSERT(colIds[0] == 0); ASSERT(colIds[0] == 0);
ASSERT(pBlock->numOfRows <= pDataCols->maxPoints);
SFile *pFile = SFile *pFile = NULL;
pBlock->last ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); if (pBlock->last) {
pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
} else {
pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA);
}
SBlockCol blockCol = {0}; SBlockCol blockCol = {0};
// If only load timestamp column, no need to load SBlockData part // if only load the key timestamp column, no need to load SBlockData part
if (numOfColIds > 1 && tsdbLoadBlockDataInfo(pReadH, pBlock) < 0) return -1; if (numOfColIds > 1) {
if (tsdbLoadBlockDataInfo(pReadH, pBlock) < 0) return -1;
}
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
pDataCols->numOfRows = pBlock->numOfRows; pDataCols->numOfRows = pBlock->numOfRows;
int dcol = 0; int dcol = 0; // loop iter over pDataCols
int ccol = 0; int ccol = 0; // loop iter over SBlockCol
for (int i = 0; i < numOfColIds; i++) { for (int i = 0; i < numOfColIds; i++) {
int16_t colId = colIds[i]; int16_t colId = colIds[i];
SDataCol * pDataCol = NULL; SDataCol * pDataCol = NULL;
SBlockCol *pBlockCol = NULL; SBlockCol *pBlockCol = NULL;
// linear search over pDataCols of colId
while (true) { while (true) {
if (dcol >= pDataCols->numOfCols) { if (dcol >= pDataCols->numOfCols) {
pDataCol = NULL; pDataCol = NULL;
...@@ -458,6 +498,7 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC ...@@ -458,6 +498,7 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC
if (pDataCol == NULL) continue; if (pDataCol == NULL) continue;
ASSERT(pDataCol->colId == colId); ASSERT(pDataCol->colId == colId);
// linear search over SBlockCols
if (colId == 0) { // load the key row if (colId == 0) { // load the key row
blockCol.colId = colId; blockCol.colId = colId;
blockCol.len = pBlock->keyLen; blockCol.len = pBlock->keyLen;
...@@ -497,7 +538,7 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC ...@@ -497,7 +538,7 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC
static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
void * pBuf = pReadH->pBuf; void * pBuf = pReadH->pBuf;
SFile *pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD); SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD);
pReadH->nBlockIdx = 0; pReadH->nBlockIdx = 0;
while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) { while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) {
...@@ -508,7 +549,8 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { ...@@ -508,7 +549,8 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
pBuf = tsdbDecodeBlockIdx(pBuf, &(pReadH->pBlockIdx[pReadH->nBlockIdx])); pBuf = tsdbDecodeBlockIdx(pBuf, &(pReadH->pBlockIdx[pReadH->nBlockIdx]));
if (pBuf == NULL) { if (pBuf == NULL) {
tsdbError("vgId:%d failed to decode block idx part from file %s", REPO_ID(pRepo), pFile->fname); tsdbError("vgId:%d failed to decode block idx part from file %s at idx %d", REPO_ID(pRepo), pFile->fname,
pReadH->nBlockIdx);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -516,21 +558,9 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { ...@@ -516,21 +558,9 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
pReadH->nBlockIdx++; pReadH->nBlockIdx++;
ASSERT(pReadH->nBlockIdx == 1 || (pReadH->pBlockIdx[pReadH->nBlockIdx-1].tid < (pReadH->pBlockIdx[pReadH->nBlockIdx-2].tid)); ASSERT(pReadH->nBlockIdx == 1 || (pReadH->pBlockIdx[pReadH->nBlockIdx-1].tid < (pReadH->pBlockIdx[pReadH->nBlockIdx-2].tid));
} }
return 0;
}
int tsdbAllocBuf(void **ppBuf, int size) {
void *pBuf = *pBuf;
int tsize = taosTSizeof(pBuf);
if (tsize == 0) tsize = 1024;
while (tsize < size) { ASSERT(pReadH->nBlockIdx > 0);
tsize *= 2; return 0;
}
*ppBuf = taosTRealloc(pBuf, tsize);
if (*ppBuf == NULL) return -1;
} }
static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx) { static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx) {
...@@ -561,21 +591,18 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 ...@@ -561,21 +591,18 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
return -1; return -1;
} }
pDataCol->len = tlen; pDataCol->len = tlen;
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
dataColSetOffset(pDataCol, numOfRows);
} else {
ASSERT(pDataCol->len == pDataCol->bytes * numOfRows);
}
} else { } else {
// No need to decompress, just memcpy it // No need to decompress, just memcpy it
pDataCol->len = len - sizeof(TSCKSUM); pDataCol->len = len - sizeof(TSCKSUM);
memcpy(pDataCol->pData, content, pDataCol->len); memcpy(pDataCol->pData, content, pDataCol->len);
}
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
dataColSetOffset(pDataCol, numOfRows); dataColSetOffset(pDataCol, numOfRows);
} else { } else {
ASSERT(pDataCol->len == pDataCol->bytes * numOfRows); ASSERT(pDataCol->len == pDataCol->bytes * numOfRows);
} }
}
return 0; return 0;
} }
...@@ -591,9 +618,10 @@ static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SB ...@@ -591,9 +618,10 @@ static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SB
return -1; return -1;
} }
int64_t offset = pBlock->offset + TSDB_GET_COMPCOL_LEN(pBlock->numOfCols) + pBlockCol->offset; int64_t offset = pBlock->offset + TSDB_BLOCK_DATA_LEN(pBlock->numOfCols) + pBlockCol->offset;
if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) { if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname, offset,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
......
...@@ -100,14 +100,32 @@ void *tsdbDecodeBlockIdx(void *buf, SBlockIdx *pBlockIdx) { ...@@ -100,14 +100,32 @@ void *tsdbDecodeBlockIdx(void *buf, SBlockIdx *pBlockIdx) {
pBlockIdx->hasLast = hasLast; pBlockIdx->hasLast = hasLast;
pBlockIdx->numOfBlocks = numOfBlocks; pBlockIdx->numOfBlocks = numOfBlocks;
pBlockIdx->uid = value; pBlockIdx->uid = uid;
pBlockIdx->maxKey = (TSKEY)maxKey; pBlockIdx->maxKey = (TSKEY)maxKey;
return buf; return buf;
} }
// TODO: make it static FORCE_INLINE
void tsdbResetFGroupFd(SFileGroup *pFGroup) { void tsdbResetFGroupFd(SFileGroup *pFGroup) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
pFGroup->files[type].fd = -1; TSDB_FILE_IN_FGROUP(pFGroup, type)->fd = -1;
} }
} }
int tsdbAllocBuf(void **ppBuf, uint32_t size) {
ASSERT(size > 0);
void *pBuf = *pBuf;
uint32_t tsize = taosTSizeof(pBuf);
if (tsize >= size) return 0;
if (tsize == 0) tsize = 1024;
while (tsize < size) {
tsize *= 2;
}
*ppBuf = taosTRealloc(pBuf, tsize);
if (*ppBuf == NULL) return -1;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册