提交 54e3a3d3 编写于 作者: C Cary Xu

code optimization

上级 ad2c58e8
...@@ -28,7 +28,8 @@ typedef enum { ...@@ -28,7 +28,8 @@ typedef enum {
} ETsdbFsVer; } ETsdbFsVer;
#define TSDB_FVER_TYPE uint32_t #define TSDB_FVER_TYPE uint32_t
#define TSDB_LATEST_FVER TSDB_FS_VER_1 #define TSDB_LATEST_FVER TSDB_FS_VER_1 // latest version for DFile
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_1 // latest version for 'current' file
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
switch (fType) { switch (fType) {
...@@ -39,8 +40,6 @@ static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest v ...@@ -39,8 +40,6 @@ static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest v
} }
} }
static FORCE_INLINE uint32_t tsdbGetSFSVersion() { return TSDB_FS_VER_1; } // latest version for current
int tsdbRefactorFS(STsdbRepo *pRepo); int tsdbRefactorFS(STsdbRepo *pRepo);
// ================== TSDB global config // ================== TSDB global config
......
...@@ -52,7 +52,7 @@ typedef enum { ...@@ -52,7 +52,7 @@ typedef enum {
TSDB_FILE_META TSDB_FILE_META
} TSDB_FILE_T; } TSDB_FILE_T;
#define TSDB_FILE_MIN 3U // min number of files in one DFileSet #define TSDB_FILE_MIN 3U // min valid number of files in one DFileSet(.head/.data/.last)
// =============== SMFile // =============== SMFile
typedef struct { typedef struct {
...@@ -321,17 +321,10 @@ static FORCE_INLINE uint8_t tsdbGetNFiles(SDFileSet* pSet) { ...@@ -321,17 +321,10 @@ static FORCE_INLINE uint8_t tsdbGetNFiles(SDFileSet* pSet) {
return TSDB_FILE_MAX; return TSDB_FILE_MAX;
} }
} }
#define TSDB_FSET_FID(s) ((s)->fid) #define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) #define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) #define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_SET_INIT(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
#define TSDB_FSET_SET_CLOSED(s) \ #define TSDB_FSET_SET_CLOSED(s) \
do { \ do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < tsdbGetNFiles(s); ftype++) { \ for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < tsdbGetNFiles(s); ftype++) { \
...@@ -344,6 +337,12 @@ static FORCE_INLINE uint8_t tsdbGetNFiles(SDFileSet* pSet) { ...@@ -344,6 +337,12 @@ static FORCE_INLINE uint8_t tsdbGetNFiles(SDFileSet* pSet) {
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \ TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
} \ } \
} while (0); } while (0);
#define TSDB_FSET_SET_INIT(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver, uint8_t fsetVer); void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver, uint8_t fsetVer);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
...@@ -357,7 +356,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet* pSet); ...@@ -357,7 +356,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet); int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
ASSERT(tsdbGetNFiles(pSet) <= TSDB_FILE_MAX); ASSERT_TSDB_FSET_NFILES_VALID(pSet);
for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype)); tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
} }
......
...@@ -112,9 +112,8 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, ...@@ -112,9 +112,8 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit; goto _exit;
} else {
pTSchema = *(STSchema**)ptr;
} }
pTSchema = *(STSchema**)ptr;
} }
ASSERT(pTSchema != NULL); ASSERT(pTSchema != NULL);
......
...@@ -71,11 +71,11 @@ typedef struct { ...@@ -71,11 +71,11 @@ typedef struct {
* blkVer; // 0 - original block, 1 - block since importing .smad/.smal * blkVer; // 0 - original block, 1 - block since importing .smad/.smal
* aggrOffset; // only valid when blkVer > 0 and aggrStat > 0 * aggrOffset; // only valid when blkVer > 0 and aggrStat > 0
*/ */
#define SBlockFieldsP1 \ #define SBlockFieldsP1 \
int64_t aggrStat : 3; \ uint64_t aggrStat : 3; \
int64_t blkVer : 5; \ uint64_t blkVer : 5; \
int64_t aggrOffset : 56; \ uint64_t aggrOffset : 56; \
int32_t aggrLen uint32_t aggrLen
typedef struct { typedef struct {
SBlockFieldsP0; SBlockFieldsP0;
...@@ -87,13 +87,12 @@ typedef struct { ...@@ -87,13 +87,12 @@ typedef struct {
} SBlockV1; } SBlockV1;
typedef enum { typedef enum {
TSDB_SBLK_VER_0, TSDB_SBLK_VER_0 = 0,
TSDB_SBLK_VER_1, TSDB_SBLK_VER_1,
} ESBlockVer; } ESBlockVer;
#define SBlockVerLatest TSDB_SBLK_VER_1 #define SBlockVerLatest TSDB_SBLK_VER_1
#define SBlockBase SBlockV0 // base SBlock definition
#define SBlock SBlockV1 // latest SBlock definition #define SBlock SBlockV1 // latest SBlock definition
// lastest SBlockInfo definition // lastest SBlockInfo definition
...@@ -162,9 +161,7 @@ typedef struct { ...@@ -162,9 +161,7 @@ typedef struct {
SBlockCol cols[]; SBlockCol cols[];
} SBlockData; } SBlockData;
typedef struct { typedef struct {
// int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage int32_t numOfCols; // For recovery usage
// uint64_t uid; // For recovery usage
SAggrBlkCol cols[]; SAggrBlkCol cols[];
} SAggrBlkData; } SAggrBlkData;
...@@ -260,4 +257,21 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { ...@@ -260,4 +257,21 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
return 0; return 0;
} }
static FORCE_INLINE SBlockCol *tsdbGetSBlockCol(SBlock *pBlock, SBlockCol **pDestBlkCol, SBlockCol *pBlkCols,
int colIdx) {
if (pBlock->blkVer == SBlockVerLatest) {
*pDestBlkCol = pBlkCols + colIdx;
return *pDestBlkCol;
}
if (pBlock->blkVer == TSDB_SBLK_VER_0) {
SBlockColV0 *pBlkCol = (SBlockColV0 *)pBlkCols + colIdx;
(*pDestBlkCol)->colId = pBlkCol->colId;
(*pDestBlkCol)->len = pBlkCol->len;
(*pDestBlkCol)->type = pBlkCol->type;
(*pDestBlkCol)->offset = pBlkCol->offset;
(*pDestBlkCol)->offsetH = pBlkCol->offsetH;
}
return *pDestBlkCol;
}
#endif /*_TD_TSDB_READ_IMPL_H_*/ #endif /*_TD_TSDB_READ_IMPL_H_*/
...@@ -1080,9 +1080,9 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile ...@@ -1080,9 +1080,9 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
// Get # of cols not all NULL(not including key column) // Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0; int nColsNotAllNull = 0;
int aggrNum = 0; int nAggrCols = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol; SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol * pBlockCol = pBlockData->cols + nColsNotAllNull; SBlockCol * pBlockCol = pBlockData->cols + nColsNotAllNull;
SAggrBlkCol *pAggrBlkCol = pAggrBlkData->cols + nColsNotAllNull; SAggrBlkCol *pAggrBlkCol = pAggrBlkData->cols + nColsNotAllNull;
...@@ -1095,17 +1095,18 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile ...@@ -1095,17 +1095,18 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
pBlockCol->colId = pDataCol->colId; pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type; pBlockCol->type = pDataCol->type;
pAggrBlkCol->colId = pDataCol->colId; pAggrBlkCol->colId = pDataCol->colId;
if (tDataTypes[pDataCol->type].statisFunc) { if (tDataTypes[pDataCol->type].statisFunc) {
// (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), #if 0
// &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
// &(pBlockCol->numOfNull)); &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
#endif
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max), (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max),
&(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex), &(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex),
&(pAggrBlkCol->numOfNull)); &(pAggrBlkCol->numOfNull));
++aggrNum; ++nAggrCols;
} }
nColsNotAllNull++; nColsNotAllNull++;
} }
...@@ -1119,14 +1120,14 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile ...@@ -1119,14 +1120,14 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
int32_t lsize = tsize; int32_t lsize = tsize;
int32_t keyLen = 0; int32_t keyLen = 0;
int32_t tsizeAggr = (int32_t)tsdbBlockAggrSize(nColsNotAllNull, SBlockVerLatest); uint32_t tsizeAggr = (uint32_t)tsdbBlockAggrSize(nColsNotAllNull, SBlockVerLatest);
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
// All not NULL columns finish // All not NULL columns finish
if (ncol != 0 && tcol >= nColsNotAllNull) break; if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol * pDataCol = pDataCols->cols + ncol; SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol * pBlockCol = pBlockData->cols + tcol; SBlockCol *pBlockCol = pBlockData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue;
...@@ -1181,14 +1182,13 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile ...@@ -1181,14 +1182,13 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize);
tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)));
// Write the whole block to file // Write the whole block to file
if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) { if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) {
return -1; return -1;
} }
// pAggrBlkData->delimiter = TSDB_FILE_DELIMITER; uint32_t aggrStatus = ((nAggrCols > 0) && (rowsToWrite > 10)) ? 1 : 0; // TODO: How to make the decision?
// pAggrBlkData->uid = TABLE_UID(pTable);
int aggrStatus = ((aggrNum > 0) && (rowsToWrite > 5)) ? 1 : 0; // TODO: How to make the decision?
if (aggrStatus > 0) { if (aggrStatus > 0) {
pAggrBlkData->numOfCols = nColsNotAllNull; pAggrBlkData->numOfCols = nColsNotAllNull;
...@@ -1201,7 +1201,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile ...@@ -1201,7 +1201,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
} }
} }
// Update pBlock membership vairables // Update pBlock membership variables
pBlock->last = isLast; pBlock->last = isLast;
pBlock->offset = offset; pBlock->offset = offset;
pBlock->algorithm = pCfg->compression; pBlock->algorithm = pCfg->compression;
...@@ -1215,7 +1215,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile ...@@ -1215,7 +1215,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
// since blkVer1 // since blkVer1
pBlock->aggrStat = aggrStatus; pBlock->aggrStat = aggrStatus;
pBlock->blkVer = SBlockVerLatest; pBlock->blkVer = SBlockVerLatest;
pBlock->aggrOffset = offsetAggr; pBlock->aggrOffset = (uint64_t)offsetAggr;
pBlock->aggrLen = tsizeAggr; pBlock->aggrLen = tsizeAggr;
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
...@@ -1573,6 +1573,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1573,6 +1573,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
pCommith->isDFileSame = false; pCommith->isDFileSame = false;
pCommith->isLFileSame = false; pCommith->isLFileSame = false;
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet)); TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
} else { } else {
...@@ -1679,14 +1680,14 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1679,14 +1680,14 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbCloseAndUnsetFSet(&(pCommith->readh)); tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1; return -1;
} }
} }
} }
// TSDB_FILE_SMAL // TSDB_FILE_SMAL
ASSERT(tsdbGetNFiles(pWSet) >= TSDB_FILE_SMAL); ASSERT(tsdbGetNFiles(pWSet) >= TSDB_FILE_SMAL);
SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh)); SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh));
SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith); SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith);
if ((pCommith->isLFileSame) && access(TSDB_FILE_FULL_NAME(pRSmalF), F_OK) == 0) { if ((pCommith->isLFileSame) && access(TSDB_FILE_FULL_NAME(pRSmalF), F_OK) == 0) {
tsdbInitDFileEx(pWSmalF, pRSmalF); tsdbInitDFileEx(pWSmalF, pRSmalF);
if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) { if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) {
...@@ -1699,7 +1700,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1699,7 +1700,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbCloseAndUnsetFSet(&(pCommith->readh)); tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1; return -1;
} }
} }
} else { } else {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF)); tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF));
tsdbInitDFile(pWSmalF, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL); tsdbInitDFile(pWSmalF, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL);
......
...@@ -391,9 +391,6 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { ...@@ -391,9 +391,6 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
pTh->bindex = *(pReadH->pBlkIdx); pTh->bindex = *(pReadH->pBlkIdx);
pTh->pBlkIdx = &(pTh->bindex); pTh->pBlkIdx = &(pTh->bindex);
// if (tsdbMakeRoom((void **)(&(pTh->pInfo)), pTh->pBlkIdx->len) < 0) {
// return -1;
// }
uint32_t originLen = 0; uint32_t originLen = 0;
if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTh->pInfo)), &originLen) < 0) { if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTh->pInfo)), &originLen) < 0) {
return -1; return -1;
......
...@@ -342,6 +342,7 @@ int tsdbOpenFS(STsdbRepo *pRepo) { ...@@ -342,6 +342,7 @@ int tsdbOpenFS(STsdbRepo *pRepo) {
return -1; return -1;
} }
// TODO: seems not need, remove this logic. 2021-09-29 13:00
if (tsdbEnableUpgradeFile && tsdbRefactorFS(pRepo) < 0) { if (tsdbEnableUpgradeFile && tsdbRefactorFS(pRepo) < 0) {
tsdbError("vgId:%d failed to refactor FS since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to refactor FS since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
...@@ -432,7 +433,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { ...@@ -432,7 +433,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
return -1; return -1;
} }
fsheader.version = tsdbGetSFSVersion(); fsheader.version = TSDB_LATEST_SFS_VER;
if (pStatus->pmf == NULL) { if (pStatus->pmf == NULL) {
ASSERT(taosArrayGetSize(pStatus->df) == 0); ASSERT(taosArrayGetSize(pStatus->df) == 0);
fsheader.len = 0; fsheader.len = 0;
......
...@@ -343,7 +343,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) { ...@@ -343,7 +343,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) {
static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) { static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
char *aname; char *aname;
// The sync module would send DFileSet with latest verion. // The sync module would send DFileSet with latest verion.
buf = tsdbDecodeDFInfo(buf, &(pDFile->info), tsdbGetSFSVersion()); buf = tsdbDecodeDFInfo(buf, &(pDFile->info), TSDB_LATEST_SFS_VER);
buf = taosDecodeString(buf, &aname); buf = taosDecodeString(buf, &aname);
strncpy(TSDB_FILE_FULL_NAME(pDFile), aname, TSDB_FILENAME_LEN); strncpy(TSDB_FILE_FULL_NAME(pDFile), aname, TSDB_FILENAME_LEN);
TSDB_FILE_SET_CLOSED(pDFile); TSDB_FILE_SET_CLOSED(pDFile);
...@@ -431,7 +431,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { ...@@ -431,7 +431,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
} }
void *pBuf = buf; void *pBuf = buf;
pBuf = tsdbDecodeDFInfo(pBuf, pInfo, TSDB_LATEST_FVER); pBuf = tsdbDecodeDFInfo(pBuf, pInfo, TSDB_LATEST_FVER); // only makesure the paramter sfver > 0
return 0; return 0;
} }
...@@ -638,10 +638,11 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) { ...@@ -638,10 +638,11 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) {
} }
int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
uint8_t nDFiles = (from == NULL) ? TSDB_FILE_MAX : tsdbGetNFiles(from); uint8_t nFilesFrom = from ? tsdbGetNFiles(from) : 0;
for (TSDB_FILE_T ftype = 0; ftype < nDFiles; ftype++) { uint8_t nFilesTo = to ? tsdbGetNFiles(to) : 0;
SDFile *pDFileFrom = (from) ? TSDB_DFILE_IN_SET(from, ftype) : NULL; for (TSDB_FILE_T ftype = 0; ftype < MAX(nFilesFrom, nFilesTo); ftype++) {
SDFile *pDFileTo = (to) ? TSDB_DFILE_IN_SET(to, ftype) : NULL; SDFile *pDFileFrom = ftype < nFilesFrom ? TSDB_DFILE_IN_SET(from, ftype) : NULL;
SDFile *pDFileTo = ftype < nFilesTo ? TSDB_DFILE_IN_SET(to, ftype) : NULL;
if (tsdbApplyDFileChange(pDFileFrom, pDFileTo) < 0) { if (tsdbApplyDFileChange(pDFileFrom, pDFileTo) < 0) {
return -1; return -1;
} }
......
...@@ -1090,22 +1090,8 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int ...@@ -1090,22 +1090,8 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int
assert(compIndex->len > 0); assert(compIndex->len > 0);
// if (pCheckInfo->compSize < (int32_t)compIndex->len) { if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo),
// assert(compIndex->len > 0); (uint32_t*)(&pCheckInfo->compSize)) < 0) {
// char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
// if (t == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// code = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return code;
// }
// pCheckInfo->pCompInfo = (SBlockInfo*)t;
// pCheckInfo->compSize = compIndex->len;
// }
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo), (uint32_t*)(&pCheckInfo->compSize)) <
0) {
return terrno; return terrno;
} }
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo; SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
......
...@@ -252,8 +252,9 @@ static FORCE_INLINE int32_t tsdbGetSBlockVer(int32_t fver) { ...@@ -252,8 +252,9 @@ static FORCE_INLINE int32_t tsdbGetSBlockVer(int32_t fver) {
case TSDB_FS_VER_0: case TSDB_FS_VER_0:
return TSDB_SBLK_VER_0; return TSDB_SBLK_VER_0;
case TSDB_FS_VER_1: case TSDB_FS_VER_1:
default:
return TSDB_SBLK_VER_1; return TSDB_SBLK_VER_1;
default:
return SBlockVerLatest;
} }
} }
...@@ -268,9 +269,9 @@ static FORCE_INLINE size_t tsdbSizeOfSBlock(int32_t sBlkVer) { ...@@ -268,9 +269,9 @@ static FORCE_INLINE size_t tsdbSizeOfSBlock(int32_t sBlkVer) {
} }
} }
static int tsdbHeadRefactor(SDFile *pHeadf, SBlockInfo **pDstBlkInfo, SBlockIdx *pBlkIdx, uint32_t *dstBlkInfoLen) { static int tsdbSBlkInfoRefactor(SDFile *pHeadf, SBlockInfo **pDstBlkInfo, SBlockIdx *pBlkIdx, uint32_t *dstBlkInfoLen) {
int sBlkVer = tsdbGetSBlockVer(pHeadf->info.fver); int sBlkVer = tsdbGetSBlockVer(pHeadf->info.fver);
if (sBlkVer == SBlockVerLatest) { if (sBlkVer > TSDB_SBLK_VER_0) {
*dstBlkInfoLen = pBlkIdx->len; *dstBlkInfoLen = pBlkIdx->len;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -346,7 +347,7 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void **pTarget, uint32_t *extendedLen) { ...@@ -346,7 +347,7 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void **pTarget, uint32_t *extendedLen) {
ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid); ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);
uint32_t dstBlkInfoLen = 0; uint32_t dstBlkInfoLen = 0;
if (tsdbHeadRefactor(pHeadf, &(pReadh->pBlkInfo), pBlkIdx, &dstBlkInfoLen) < 0) { if (tsdbSBlkInfoRefactor(pHeadf, &(pReadh->pBlkInfo), pBlkIdx, &dstBlkInfoLen) < 0) {
return -1; return -1;
} }
...@@ -425,7 +426,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, ...@@ -425,7 +426,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock) { static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock) {
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) { if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block aggr part while seek file %s to offset %" PRId64 " since %s", tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno)); TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1; return -1;
} }
...@@ -435,14 +436,14 @@ static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock) { ...@@ -435,14 +436,14 @@ static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock) {
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size); int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
if (nread < 0) { if (nread < 0) {
tsdbError("vgId:%d failed to load block aggr part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu, tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size); TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size);
return -1; return -1;
} }
if (nread < size) { if (nread < size) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
" read bytes: %" PRId64, " read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread); TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread);
return -1; return -1;
...@@ -450,7 +451,7 @@ static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock) { ...@@ -450,7 +451,7 @@ static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock) {
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) { if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu, tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size); TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
return -1; return -1;
} }
...@@ -462,8 +463,8 @@ static int tsdbLoadBlockStatisFromAggr(SReadH *pReadh, SBlock *pBlock) { ...@@ -462,8 +463,8 @@ static int tsdbLoadBlockStatisFromAggr(SReadH *pReadh, SBlock *pBlock) {
SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh); SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh);
if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) { if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block aggr part while seek file %s to offset %" PRId64 " since %s", tsdbError("vgId:%d failed to load block aggr part while seek file %s to offset %" PRIu64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset,
tstrerror(terrno)); tstrerror(terrno));
return -1; return -1;
} }
...@@ -473,25 +474,25 @@ static int tsdbLoadBlockStatisFromAggr(SReadH *pReadh, SBlock *pBlock) { ...@@ -473,25 +474,25 @@ static int tsdbLoadBlockStatisFromAggr(SReadH *pReadh, SBlock *pBlock) {
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr); int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
if (nreadAggr < 0) { if (nreadAggr < 0) {
tsdbError("vgId:%d failed to load block aggr part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu, tsdbError("vgId:%d failed to load block aggr part while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno), TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno),
(int64_t)pBlock->aggrOffset, sizeAggr); (uint64_t)pBlock->aggrOffset, sizeAggr);
return -1; return -1;
} }
if (nreadAggr < sizeAggr) { if (nreadAggr < sizeAggr) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu tsdbError("vgId:%d block aggr part in file %s is corrupted, offset:%" PRIu64 " expected bytes:%" PRIzu
" read bytes: %" PRId64, " read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr, TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr,
nreadAggr); nreadAggr);
return -1; return -1;
} }
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) { if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu, tsdbError("vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr); TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr);
return -1; return -1;
} }
return 0; return 0;
...@@ -667,6 +668,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat ...@@ -667,6 +668,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
int ccol = 0; // loop iter for SBlockCol object int ccol = 0; // loop iter for SBlockCol object
int dcol = 0; // loop iter for SDataCols object int dcol = 0; // loop iter for SDataCols object
SBlockCol blockCol = {0}; SBlockCol blockCol = {0};
SBlockCol *pBlockCol = &blockCol;
while (dcol < pDataCols->numOfCols) { while (dcol < pDataCols->numOfCols) {
SDataCol *pDataCol = &(pDataCols->cols[dcol]); SDataCol *pDataCol = &(pDataCols->cols[dcol]);
if (dcol != 0 && ccol >= pBlockData->numOfCols) { if (dcol != 0 && ccol >= pBlockData->numOfCols) {
...@@ -680,20 +682,12 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat ...@@ -680,20 +682,12 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
uint32_t toffset = TSDB_KEY_COL_OFFSET; uint32_t toffset = TSDB_KEY_COL_OFFSET;
int32_t tlen = pBlock->keyLen; int32_t tlen = pBlock->keyLen;
if (dcol != 0) { if (dcol != 0) {
if (pBlock->blkVer == SBlockVerLatest) { tsdbGetSBlockCol(pBlock, &pBlockCol, pBlockData->cols, ccol);
SBlockCol *pBlockCol = &(pBlockData->cols[ccol]); tcolId = pBlockCol->colId;
tcolId = pBlockCol->colId; toffset = tsdbGetBlockColOffset(pBlockCol);
toffset = tsdbGetBlockColOffset(pBlockCol); tlen = pBlockCol->len;
tlen = pBlockCol->len;
} else {
SBlockColV0 *pBlockCol = (SBlockColV0 *)(pBlockData->cols) + ccol;
tcolId = pBlockCol->colId;
blockCol.offset = pBlockCol->offset;
blockCol.offsetH = pBlockCol->offsetH;
toffset = tsdbGetBlockColOffset(&blockCol);
tlen = pBlockCol->len;
}
} else { } else {
ASSERT(pDataCol->colId == tcolId); ASSERT(pDataCol->colId == tcolId);
} }
...@@ -814,17 +808,8 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * ...@@ -814,17 +808,8 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
break; break;
} }
if (pBlock->blkVer == SBlockVerLatest) { pBlockCol = &blockCol;
pBlockCol = &(pReadh->pBlkData->cols[ccol]); tsdbGetSBlockCol(pBlock, &pBlockCol, pReadh->pBlkData->cols, ccol);
} else {
SBlockColV0 *pBlkCol = ((SBlockColV0 *)(pReadh->pBlkData->cols)) + ccol;
blockCol.colId = pBlkCol->colId;
blockCol.len = pBlkCol->len;
blockCol.type = pBlkCol->type;
blockCol.offset = pBlkCol->offset;
blockCol.offsetH = pBlkCol->offsetH;
pBlockCol = &blockCol;
}
if (pBlockCol->colId > colId) { if (pBlockCol->colId > colId) {
pBlockCol = NULL; pBlockCol = NULL;
......
...@@ -411,7 +411,6 @@ static int walSMemRowCheck(SWalHead *pHead) { ...@@ -411,7 +411,6 @@ static int walSMemRowCheck(SWalHead *pHead) {
pWalHead->len = pWalHead->len + lenExpand; pWalHead->len = pWalHead->len + lenExpand;
} }
ASSERT((sizeof(SWalHead) + pWalHead->len) <= WAL_MAX_SIZE);
memcpy(pHead, pWalHead, sizeof(SWalHead) + pWalHead->len); memcpy(pHead, pWalHead, sizeof(SWalHead) + pWalHead->len);
tfree(pWalHead); tfree(pWalHead);
} }
......
...@@ -408,4 +408,4 @@ sql select count(*) from $stb ...@@ -408,4 +408,4 @@ sql select count(*) from $stb
print data00 $data00 print data00 $data00
if $data00 != $totalRows then if $data00 != $totalRows then
return -1 return -1
endi endi
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册