提交 91d7c774 编写于 作者: H Hongze Cheng

refactor part of code

上级 ddc67482
......@@ -45,6 +45,8 @@ extern int tsdbDebugFlag;
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax)))
// NOTE: Any file format change must increase this version number by 1
// Also, implement the convert function
#define TSDB_FILE_VERSION ((uint32_t)0)
......@@ -475,6 +477,7 @@ int tsdbUpdateFileHeader(SFile* pFile);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
......@@ -513,7 +516,10 @@ int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
int tsdbWriteCompInfo(SRWHelper* pHelper);
int tsdbWriteCompIdx(SRWHelper* pHelper);
int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer);
int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SCompIdx** ppCompIdx, int* numOfIdx);
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
int tsdbLoadCompInfoImpl(SFile* pFile, SCompIdx* pIdx, SCompInfo** ppCompInfo);
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
......@@ -537,7 +543,7 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
char* tsdbGetMetaFileName(char* rootDir);
void tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type, char* fname);
void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname);
int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
char* tsdbGetDataDirName(char* rootDir);
......
......@@ -302,7 +302,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
memset((void *)pFile, 0, sizeof(SFile));
pFile->fd = -1;
tsdbGetDataFileName(pRepo, fid, type, pFile->fname);
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
if (access(pFile->fname, F_OK) == 0) {
tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), pFile->fname);
......@@ -424,33 +424,57 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
}
}
void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
uint32_t version = 0;
STsdbFileInfo info = {0};
int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
int fd = open(fname, O_RDONLY);
if (fd < 0) goto _err;
if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
tsdbError("failed to lseek file %s to start since %s", pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosTRead(fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) goto _err;
if (taosTRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read file %s header part with %d bytes, reason:%s", pFile->fname, TSDB_FILE_HEAD_SIZE,
strerror(errno));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) goto _err;
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
tsdbError("file %s header part is corrupted with failed checksum", pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
void *pBuf = (void *)buf;
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &info);
pBuf = taosDecodeFixedU32(pBuf, version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
return 0;
}
off_t offset = lseek(fd, 0, SEEK_END);
void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
uint32_t version = 0;
SFile file = {0};
SFile * pFile = &file;
strncpy(pFile->fname, fname, TSDB_FILENAME_LEN);
pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
if (tsdbLoadFileHeader(pFile, &version) < 0) goto _err;
off_t offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) goto _err;
close(fd);
tsdbCloseFile(pFile);
*magic = info.magic;
*magic = pFile->info.magic;
*size = offset;
return;
_err:
if (fd >= 0) close(fd);
tsdbCloseFile(pFile);
*magic = TSDB_FILE_INIT_MAGIC;
*size = 0;
}
......@@ -458,34 +482,23 @@ _err:
// ---------------- LOCAL FUNCTIONS ----------------
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
uint32_t version;
char buf[512] = "\0";
tsdbGetDataFileName(pRepo, fid, type, pFile->fname);
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
if (taosTRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILE_HEAD_SIZE,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
if (tsdbLoadFileHeader(pFile, &version) < 0) {
tsdbError("vgId:%d failed to load file %s header part since %s", REPO_ID(pRepo), pFile->fname, tstrerror(terrno));
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
tsdbError("vgId:%d file %s head part is corrupted", REPO_ID(pRepo), pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
void *pBuf = buf;
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
if (pFile->info.size == TSDB_FILE_HEAD_SIZE) {
pFile->info.size = lseek(pFile->fd, 0, SEEK_END);
}
if (version != TSDB_FILE_VERSION) {
// TODO: deal with error
tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem",
REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION);
}
......
......@@ -354,8 +354,8 @@ char *tsdbGetMetaFileName(char *rootDir) {
return fname;
}
void tsdbGetDataFileName(STsdbRepo *pRepo, int fid, int type, char *fname) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", pRepo->rootDir, TSDB_DATA_DIR_NAME, REPO_ID(pRepo), fid, tsdbFileSuffix[type]);
void tsdbGetDataFileName(char *rootDir, int vid, int fid, int type, char *fname) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type]);
}
int tsdbLockRepo(STsdbRepo *pRepo) {
......
......@@ -102,7 +102,8 @@ void tsdbResetHelper(SRWHelper *pHelper) {
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
ASSERT(pHelper != NULL && pGroup != NULL);
SFile *pFile = NULL;
SFile * pFile = NULL;
STsdbRepo *pRepo = pHelper->pRepo;
// Clear the helper object
tsdbResetHelper(pHelper);
......@@ -112,8 +113,10 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Set the files
pHelper->files.fGroup = *pGroup;
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname);
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname);
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD,
helperNewHeadF(pHelper)->fname);
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
helperNewLastF(pHelper)->fname);
}
// Open the files
......@@ -443,10 +446,64 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
return 0;
}
int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffer) {
const char *prefixMsg = "failed to load SCompIdx part";
if (lseek(pFile->fd, offset, SEEK_SET) < 0) {
tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, pFile->fname, offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosTRead(pFile->fd, buffer, len) < len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, offset, len,
strerror(errno));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buffer, len)) {
tsdbError("%s: file %s corrupted, offset %u len %u", prefixMsg, pFile->fname, offset, len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
return 0;
}
int tsdbDecodeSCompIdxImpl(void *buffer, uint32_t len, SCompIdx **ppCompIdx, int *numOfIdx) {
int nIdx = 0;
void *pPtr = buffer;
while (POINTER_DISTANCE(pPtr, buffer) < (int)(len - sizeof(TSCKSUM))) {
size_t tlen = taosTSizeof(*ppCompIdx);
if (tlen < sizeof(SCompIdx) * (nIdx + 1)) {
*ppCompIdx = (SCompIdx *)taosTRealloc(*ppCompIdx, (tlen == 0) ? 1024 : tlen * 2);
if (*ppCompIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
pPtr = tsdbDecodeSCompIdx(pPtr, &((*ppCompIdx)[nIdx]));
if (pPtr == NULL) {
tsdbError("failed to decode SCompIdx part, idx:%d", nIdx);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
nIdx++;
ASSERT(nIdx == 1 || (*ppCompIdx)[nIdx - 1].tid > (*ppCompIdx)[nIdx - 2].tid);
ASSERT(POINTER_DISTANCE(pPtr, buffer) <= (int)(len - sizeof(TSCKSUM)));
}
*numOfIdx = nIdx;
return 0;
}
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
SFile *pFile = helperHeadF(pHelper);
int fd = pFile->fd;
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
// If not load from file, just load it in object
......@@ -456,54 +513,18 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
return -1;
}
if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosTRead(fd, (void *)(pHelper->pBuffer), pFile->info.len) < (int)pFile->info.len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
// Load SCompIdx binary from file
if (tsdbLoadCompIdxImpl(pFile, pFile->info.offset, pFile->info.len, (void *)(pHelper->pBuffer)) < 0) {
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname,
pFile->info.len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
// Decode the SCompIdx part
if (tsdbDecodeSCompIdxImpl(pHelper->pBuffer, pFile->info.len, &(pHelper->idxH.pIdxArray),
&(pHelper->idxH.numOfIdx)) < 0) {
tsdbError("vgId:%d failed to decode SCompIdx part from file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname,
tstrerror(errno));
return -1;
}
// Decode it
pHelper->idxH.numOfIdx = 0;
void *ptr = pHelper->pBuffer;
while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (int)(pFile->info.len - sizeof(TSCKSUM))) {
size_t tlen = taosTSizeof(pHelper->idxH.pIdxArray);
pHelper->idxH.numOfIdx++;
if (tlen < pHelper->idxH.numOfIdx * sizeof(SCompIdx)) {
pHelper->idxH.pIdxArray = (SCompIdx *)taosTRealloc(pHelper->idxH.pIdxArray, (tlen == 0) ? 1024 : tlen * 2);
if (pHelper->idxH.pIdxArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
ptr = tsdbDecodeSCompIdx(ptr, &(pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1]));
if (ptr == NULL) {
tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname,
pFile->info.len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
ASSERT(pHelper->idxH.numOfIdx == 1 || pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1].tid >
pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 2].tid);
ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= (int)(pFile->info.len - sizeof(TSCKSUM)));
}
}
}
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
......@@ -515,36 +536,49 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
return 0;
}
int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) {
const char *prefixMsg = "failed to load SCompInfo/SCompBlock part";
if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) {
tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
*ppCompInfo = taosTRealloc((void *)(*ppCompInfo), pIdx->len);
if (*ppCompInfo == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (taosTRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, pIdx->len,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(*ppCompInfo), pIdx->len)) {
tsdbError("%s: file %s corrupted, offset %u len %u", prefixMsg, pFile->fname, pIdx->offset, pIdx->len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
return 0;
}
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
SCompIdx *pIdx = &(pHelper->curCompIdx);
int fd = helperHeadF(pHelper)->fd;
SFile *pFile = helperHeadF(pHelper);
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
if (pIdx->offset > 0) {
ASSERT(pIdx->uid == pHelper->tableInfo.uid);
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperHeadF(pHelper)->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pHelper->pCompInfo = taosTRealloc((void *)pHelper->pCompInfo, pIdx->len);
if (taosTRead(fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
helperHeadF(pHelper)->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) {
tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo),
helperHeadF(pHelper)->fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (tsdbLoadCompInfoImpl(pFile, pIdx, &(pHelper->pCompInfo)) < 0) return -1;
ASSERT(pIdx->uid == pHelper->pCompInfo->uid && pIdx->tid == pHelper->pCompInfo->tid);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册