diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h
index 7d3f1c530d84853a9c0abea9ab4641f1bfc5723d..da58b9e83d5b9c1f648f3b113a52dd6534b4dfb2 100644
--- a/src/tsdb/inc/tsdbMain.h
+++ b/src/tsdb/inc/tsdbMain.h
@@ -272,48 +272,6 @@ typedef struct {
SBlockCol cols[];
} SBlockData;
-// typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
-
-// typedef struct {
-// TSKEY minKey;
-// TSKEY maxKey;
-// SFileGroup fGroup;
-// SFile nHeadF;
-// SFile nLastF;
-// } SHelperFile;
-
-// typedef struct {
-// uint64_t uid;
-// int32_t tid;
-// } SHelperTable;
-
-// typedef struct {
-// SBlockIdx* pIdxArray;
-// int numOfIdx;
-// int curIdx;
-// } SIdxH;
-
-// typedef struct {
-// tsdb_rw_helper_t type;
-
-// STsdbRepo* pRepo;
-// int8_t state;
-// // For file set usage
-// SHelperFile files;
-// SIdxH idxH;
-// SBlockIdx curCompIdx;
-// void* pWIdx;
-// // For table set usage
-// SHelperTable tableInfo;
-// SBlockInfo* pCompInfo;
-// bool hasOldLastBlock;
-// // For block set usage
-// SBlockData* pCompData;
-// SDataCols* pDataCols[2];
-// void* pBuffer; // Buffer to hold the whole data block
-// void* compBuffer; // Buffer for temperary compress/decompress purpose
-// } SRWHelper;
-
// ------------------ tsdbScan.c
typedef struct {
SFileGroup fGroup;
@@ -342,7 +300,6 @@ typedef struct {
void* pCBuf;
} SReadHandle;
-#define TSDB_READ_FILE(pReadH, type) (&((pReadH)->fGroup.files[(type)]))
#define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM))
// Operations
@@ -506,62 +463,6 @@ int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
-// ------------------ tsdbRWHelper.c
-// #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
-// #define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
-// #define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded
-// #define TSDB_HELPER_TABLE_SET 0x4 // Table is set
-// #define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
-// #define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
-// #define helperSetState(h, s) (((h)->state) |= (s))
-// #define helperClearState(h, s) ((h)->state &= (~(s)))
-// #define helperHasState(h, s) ((((h)->state) & (s)) == (s))
-// #define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
-// #define TSDB_MAX_SUBBLOCKS 8
-// #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
-// #define helperType(h) (h)->type
-// #define helperRepo(h) (h)->pRepo
-// #define helperState(h) (h)->state
-// #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
-// #define helperFileId(h) ((h)->files.fGroup.fileId)
-// #define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
-// #define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
-// #define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
-// #define helperNewHeadF(h) (&((h)->files.nHeadF))
-// #define helperNewLastF(h) (&((h)->files.nLastF))
-
-// int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
-// int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
-// void tsdbDestroyHelper(SRWHelper* pHelper);
-// void tsdbResetHelper(SRWHelper* pHelper);
-// int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
-// int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup);
-// int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
-// int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
-// int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
-// int tsdbWriteCompInfo(SRWHelper* pHelper);
-// int tsdbWriteCompIdx(SRWHelper* pHelper);
-// int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer);
-// int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx);
-// int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
-// int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo);
-// int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
-// int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target);
-// void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
-// int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds,
-// int numOfColIds);
-// int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo);
-
-// static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
-// if (*(TSKEY*)key1 > *(TSKEY*)key2) {
-// return 1;
-// } else if (*(TSKEY*)key1 == *(TSKEY*)key2) {
-// return 0;
-// } else {
-// return -1;
-// }
-// }
-
// ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId
#define IS_REPO_LOCKED(r) (r)->repoLocked
@@ -609,7 +510,7 @@ int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock);
#define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)]))
-int tsdbAllocBuf(void** ppBuf, int size);
+int tsdbAllocBuf(void **ppBuf, uint32_t size);
#ifdef __cplusplus
}
diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c
index e8f88ff0bef1183608cd19abccec1e49785606fe..0f2c20fcbfa57acae6d45078942567a0e1f273dd 100644
--- a/src/tsdb/src/tsdbReadUtil.c
+++ b/src/tsdb/src/tsdbReadUtil.c
@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+#include
#include
#include
#include
@@ -23,6 +24,15 @@
#define TSDB_KEY_COL_OFFSET 0
+static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols);
+static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
+ int numOfColIds);
+static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH);
+static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx);
+static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
+ int maxPoints, char *buffer, int bsize);
+static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
+
SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) {
SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH));
if (pReadH == NULL) {
@@ -35,6 +45,7 @@ SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &(pRepo->config);
+ // TODO: make the memory allocation on demand
if ((pReadH->pDataCols[0] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL ||
(pReadH->pDataCols[1] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
@@ -61,6 +72,8 @@ void tsdbFreeReadHandle(SReadHandle *pReadH) {
}
int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
+ ASSERT(pReadH != NULL && pFGroup != NULL);
+
STsdbRepo *pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config);
@@ -69,7 +82,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
tsdbResetFGroupFd(&(pReadH->fGroup));
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
- SFile *pFile = TSDB_READ_FILE(pReadH, type);
+ SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), type);
if (pFile->fname[0] != '\0') { // pFile->fname[0] == '\0' is for commit usage
pFile->fd = open(pFile->fname, O_RDONLY);
@@ -89,7 +102,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
- SFile *pFile = TSDB_READ_FILE(pReadH, type);
+ SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), type);
if (pFile->fd >= 0) {
(void)close(pFile->fd);
@@ -99,15 +112,19 @@ void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) {
}
int tsdbLoadBlockIdx(SReadHandle *pReadH) {
+ ASSERT(pReadH != NULL);
+
STsdbRepo *pRepo = pReadH->pRepo;
- SFile * pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD);
+ SFile * pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD);
- if (pFile->fd < 0 || pFile->info.len == 0) {
+ if (pFile->fd < 0 || pFile->info.len == 0) { // for backward compatibility
pReadH->nBlockIdx = 0;
pReadH->pCurBlockIdx = NULL;
return 0;
}
+ ASSERT(pFile->info.size == pFile->info.offset + pFile->info.len);
+
if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
@@ -129,15 +146,15 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) {
}
if (ret < pFile->info.len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), pFile->info.len)) {
- tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u", REPO_ID(pRepo), pFile->fname,
- pFile->info.offset, pFile->info.len);
+ tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u file size %" PRIu64, REPO_ID(pRepo),
+ pFile->fname, pFile->info.offset, pFile->info.len, pFile->info.size);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (tsdbDecodeBlockIdxArray(pReadH) < 0) {
- tsdbError("vgId:%d error occurs while decoding block idx part from file %s", REPO_ID(pRepo), pFile->fname);
- terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
+ tsdbError("vgId:%d error occurs while decoding block idx part from file %s since %s", REPO_ID(pRepo), pFile->fname,
+ tstrerror(terrno));
return -1;
}
@@ -147,7 +164,7 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) {
}
int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) {
- ASSERT(pTable != NULL);
+ ASSERT(pReadH != NULL && pTable != NULL);
pReadH->pTable = pTable;
@@ -162,6 +179,7 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) {
if (pReadH->nBlockIdx > 0) {
ASSERT(pReadH->cBlockIdx <= pReadH->nBlockIdx);
+ // linear search TABLE_TID(pTable)
while (true) {
if (pReadH->cBlockIdx >= pReadH->nBlockIdx) {
pReadH->pCurBlockIdx = NULL;
@@ -192,15 +210,17 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) {
}
int tsdbLoadBlockInfo(SReadHandle *pReadH) {
+ ASSERT(pReadH != NULL);
+
if (pReadH->pCurBlockIdx == NULL) return 0;
- SFile * pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD);
+ SFile * pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD);
SBlockIdx *pBlockIdx = pReadH->pCurBlockIdx;
STsdbRepo *pRepo = pReadH->pRepo;
- ASSERT(pFile->fd > 0);
+ ASSERT(pFile->fd > 0 && pBlockIdx->len > 0);
- if (tsdbAllocBuf(&((void *)pReadH->pBlockInfo), pBlockIdx->len) < 0) {
+ if (tsdbAllocBuf(&((void *)(pReadH->pBlockInfo)), pBlockIdx->len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
@@ -221,7 +241,7 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) {
}
if (ret < pBlockIdx->len || tsdbVerifyBlockInfo(pReadH->pBlockInfo, pBlockIdx) < 0) {
- tsdbError("vgId:%d table %s block info part is corrupted from file %s", REPO_ID(pRepo),
+ tsdbError("vgId:%d table %s block info part is corrupted in file %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pReadH->pTable), pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
@@ -270,7 +290,7 @@ int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBloc
for (int i = 1; i < nSubBlock; i++) {
pSubBlock++;
if (tsdbLoadBlockDataColsImpl(pReadH, pSubBlock, pReadH->pDataCols[1], colIds, numOfCols) < 0) return -1;
- if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) goto _err;
+ if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) return -1;
}
ASSERT(pReadH->pDataCols[0]->numOfRows == pBlock->numOfRows);
@@ -284,8 +304,12 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
STsdbRepo *pRepo = pReadH->pRepo;
- SFile * pFile =
- (pBlock->last) ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA);
+ SFile * pFile = NULL;
+ if (pBlock->last) {
+ pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
+ } else {
+ pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA);
+ }
if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname,
@@ -316,6 +340,9 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) {
}
ASSERT(pReadH->pBlockData->numOfCols == pBlock->numOfCols);
+ ASSERT(pReadH->pBlockData->delimiter == TSDB_FILE_DELIMITER);
+ ASSERT(pReadH->pBlockData->numOfCols == pBlock->numOfCols);
+
return 0;
}
@@ -323,8 +350,12 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
ASSERT(pBlock->numOfSubBlocks <= 1);
STsdbRepo *pRepo = pReadH->pRepo;
- SFile *pFile =
- (pBlock->last) ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA);
+ SFile * pFile = NULL;
+ if (pBlock->last) {
+ pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
+ } else {
+ pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA);
+ }
if (tsdbAllocBuf(&(pReadH->pBuf), pBlock->len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
@@ -347,8 +378,9 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
}
int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols);
- if (ret < pBlock->len || !taosCheckChecksumWhole((uint8_t *)pReadH->pBuf, tsize)) {
- tsdbError("vgId:%d block data part from file %s is corrupted", REPO_ID(pRepo), pFile->fname);
+ if (ret < pBlock->len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), tsize)) {
+ tsdbError("vgId:%d block data part from file %s at offset %" PRId64 " len %d is corrupted", REPO_ID(pRepo),
+ pFile->fname, pBlock->offset, pBlock->len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
@@ -362,8 +394,8 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
ASSERT(pBlock->numOfRows <= pDataCols->maxPoints);
pDataCols->numOfRows = pBlock->numOfRows;
- int ccol = 0;
- int dcol = 0;
+ int ccol = 0; // loop iter over SBlockCols
+ int dcol = 0; // loop iter over pDataCols
while (dcol < pDataCols->numOfCols) {
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
if (dcol != 0 && ccol >= pBlockData->numOfCols) {
@@ -422,24 +454,32 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC
int numOfColIds) {
ASSERT(pBlock->numOfSubBlocks <= 1);
ASSERT(colIds[0] == 0);
+ ASSERT(pBlock->numOfRows <= pDataCols->maxPoints);
- SFile *pFile =
- pBlock->last ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA);
+ SFile *pFile = NULL;
+ if (pBlock->last) {
+ pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
+ } else {
+ pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA);
+ }
SBlockCol blockCol = {0};
- // If only load timestamp column, no need to load SBlockData part
- if (numOfColIds > 1 && tsdbLoadBlockDataInfo(pReadH, pBlock) < 0) return -1;
+ // if only load the key timestamp column, no need to load SBlockData part
+ if (numOfColIds > 1) {
+ if (tsdbLoadBlockDataInfo(pReadH, pBlock) < 0) return -1;
+ }
tdResetDataCols(pDataCols);
pDataCols->numOfRows = pBlock->numOfRows;
- int dcol = 0;
- int ccol = 0;
+ int dcol = 0; // loop iter over pDataCols
+ int ccol = 0; // loop iter over SBlockCol
for (int i = 0; i < numOfColIds; i++) {
int16_t colId = colIds[i];
SDataCol * pDataCol = NULL;
SBlockCol *pBlockCol = NULL;
+ // linear search over pDataCols of colId
while (true) {
if (dcol >= pDataCols->numOfCols) {
pDataCol = NULL;
@@ -458,6 +498,7 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC
if (pDataCol == NULL) continue;
ASSERT(pDataCol->colId == colId);
+ // linear search over SBlockCols
if (colId == 0) { // load the key row
blockCol.colId = colId;
blockCol.len = pBlock->keyLen;
@@ -497,7 +538,7 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC
static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
void * pBuf = pReadH->pBuf;
- SFile *pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD);
+ SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD);
pReadH->nBlockIdx = 0;
while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) {
@@ -508,7 +549,8 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
pBuf = tsdbDecodeBlockIdx(pBuf, &(pReadH->pBlockIdx[pReadH->nBlockIdx]));
if (pBuf == NULL) {
- tsdbError("vgId:%d failed to decode block idx part from file %s", REPO_ID(pRepo), pFile->fname);
+ tsdbError("vgId:%d failed to decode block idx part from file %s at idx %d", REPO_ID(pRepo), pFile->fname,
+ pReadH->nBlockIdx);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
@@ -516,21 +558,9 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
pReadH->nBlockIdx++;
ASSERT(pReadH->nBlockIdx == 1 || (pReadH->pBlockIdx[pReadH->nBlockIdx-1].tid < (pReadH->pBlockIdx[pReadH->nBlockIdx-2].tid));
}
- return 0;
-}
-
-int tsdbAllocBuf(void **ppBuf, int size) {
- void *pBuf = *pBuf;
- int tsize = taosTSizeof(pBuf);
- if (tsize == 0) tsize = 1024;
-
- while (tsize < size) {
- tsize *= 2;
- }
-
- *ppBuf = taosTRealloc(pBuf, tsize);
- if (*ppBuf == NULL) return -1;
+ ASSERT(pReadH->nBlockIdx > 0);
+ return 0;
}
static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx) {
@@ -561,21 +591,18 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
return -1;
}
pDataCol->len = tlen;
- if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
- dataColSetOffset(pDataCol, numOfRows);
- } else {
- ASSERT(pDataCol->len == pDataCol->bytes * numOfRows);
- }
} else {
// No need to decompress, just memcpy it
pDataCol->len = len - sizeof(TSCKSUM);
memcpy(pDataCol->pData, content, pDataCol->len);
- if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
- dataColSetOffset(pDataCol, numOfRows);
- } else {
- ASSERT(pDataCol->len == pDataCol->bytes * numOfRows);
- }
}
+
+ if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
+ dataColSetOffset(pDataCol, numOfRows);
+ } else {
+ ASSERT(pDataCol->len == pDataCol->bytes * numOfRows);
+ }
+
return 0;
}
@@ -591,9 +618,10 @@ static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SB
return -1;
}
- int64_t offset = pBlock->offset + TSDB_GET_COMPCOL_LEN(pBlock->numOfCols) + pBlockCol->offset;
+ int64_t offset = pBlock->offset + TSDB_BLOCK_DATA_LEN(pBlock->numOfCols) + pBlockCol->offset;
if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) {
- tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno));
+ tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname, offset,
+ strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
diff --git a/src/tsdb/src/tsdbUtil.c b/src/tsdb/src/tsdbUtil.c
index 18a23284df2e23d01026f6af21d80226290c535f..d2a1bd53aa61efc9983dacb978a0060f4a4a82cc 100644
--- a/src/tsdb/src/tsdbUtil.c
+++ b/src/tsdb/src/tsdbUtil.c
@@ -100,14 +100,32 @@ void *tsdbDecodeBlockIdx(void *buf, SBlockIdx *pBlockIdx) {
pBlockIdx->hasLast = hasLast;
pBlockIdx->numOfBlocks = numOfBlocks;
- pBlockIdx->uid = value;
+ pBlockIdx->uid = uid;
pBlockIdx->maxKey = (TSKEY)maxKey;
return buf;
}
+// TODO: make it static FORCE_INLINE
void tsdbResetFGroupFd(SFileGroup *pFGroup) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
- pFGroup->files[type].fd = -1;
+ TSDB_FILE_IN_FGROUP(pFGroup, type)->fd = -1;
}
-}
\ No newline at end of file
+}
+
+int tsdbAllocBuf(void **ppBuf, uint32_t size) {
+ ASSERT(size > 0);
+
+ void *pBuf = *pBuf;
+
+ uint32_t tsize = taosTSizeof(pBuf);
+ if (tsize >= size) return 0;
+
+ if (tsize == 0) tsize = 1024;
+ while (tsize < size) {
+ tsize *= 2;
+ }
+
+ *ppBuf = taosTRealloc(pBuf, tsize);
+ if (*ppBuf == NULL) return -1;
+}