提交 ea86b6e8 编写于 作者: H Hongze Cheng

TD-353

上级 3041354e
......@@ -192,6 +192,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP, 0, 0x060C, "tsdb submi
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb invalid action")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")
......
......@@ -116,8 +116,7 @@ typedef enum {
TSDB_FILE_TYPE_HEAD = 0,
TSDB_FILE_TYPE_DATA,
TSDB_FILE_TYPE_LAST,
TSDB_FILE_TYPE_NHEAD,
TSDB_FILE_TYPE_NLAST
TSDB_FILE_TYPE_MAX,
} TSDB_FILE_TYPE;
typedef struct {
......@@ -137,10 +136,8 @@ typedef struct {
} SFile;
typedef struct {
int fileId;
SFile headF;
SFile dataF;
SFile lastF;
int fileId;
SFile files[TSDB_FILE_TYPE_MAX];
} SFileGroup;
typedef struct {
......@@ -313,11 +310,23 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
#define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId
#define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId
#define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0)
#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
void tsdbFreeFileH(STsdbFileH* pFileH);
int* tsdbOpenFileH(STsdbRepo* pRepo);
void tsdbCloseFileH(STsdbRepo* pRepo);
SFileGroup* tsdbCreateFGroupIfNeed(STsdbFileH* pFileH, char* dataDir, int fid, int maxTables);
void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
int tsdbOpenFile(SFile* pFile, int oflag);
void tsdbCloseFile(SFile* pFile);
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
void tsdbFitRetention(STsdbRepo* pRepo);
// ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
......@@ -349,7 +358,6 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
// --------- Helper state
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
void tsdbDestroyHelper(SRWHelper *pHelper);
......
......@@ -29,7 +29,13 @@
#include "tutil.h"
#include "ttime.h"
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".h", ".l"};
const char *tsdbFileSuffix[] = {".head", ".data", ".last"};
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile);
static int compFGroup(const void *arg1, const void *arg2);
static int keyFGroupCompFunc(const void *key, const void *fgroup);
static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup);
// ---------------- INTERNAL FUNCTIONS ----------------
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
......@@ -39,7 +45,7 @@ STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
goto _err;
}
int code = pthread_rwlock_init(&(pFileH->fhlock));
int code = pthread_rwlock_init(&(pFileH->fhlock), NULL);
if (code != 0) {
tsdbError("vgId:%d failed to init file handle lock since %s", pCfg->tsdbId, strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
......@@ -76,6 +82,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) {
DIR * dir = NULL;
int fid = 0;
SFileGroup fileGroup = {0};
STsdbFileH pFileH = pRepo->tsdbFileH;
tDataDir = tsdbGetDataDirName(pRepo->rootDir);
......@@ -95,21 +102,22 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) {
while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue;
sscanf(dp->d_name, "f%d", &fid);
SFileGroup fileGroup = {0};
if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue;
fileGroup.fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type <= TSDB_FILE_TYPE_LAST; type++) {
fileGroup.headF.fname = tsdbGetDataFileName(pRepo, fid, type);
if (fileGroup.headF.fname == NULL) goto _err;
if (tsdbInitFile(fileGroup.headF))
if (tsdbSearchFGroup(pRepo->tsdbFileH, fid, TD_EQ) != NULL) return 0;
fileGroup = {0};
fileGroup.fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) {
tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type);
goto _err;
}
}
for (int type = TSDB_FILE_TYPE_NHEAD; type <= TSDB_FILE_TYPE_NLAST; type++) {
}
tsdbTrace("vgId:%d file group %d init", REPO_ID(pRepo), fid);
pFileH->[pFileH->nFGroups++] = fileGroup;
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
}
tfree(tDataDir);
......@@ -117,8 +125,11 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) {
return 0;
_err:
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]);
tfree(tDataDir);
if (dir != NULL) closedir(tDataDir);
tsdbCloseFileH(pRepo);
return -1;
}
......@@ -126,17 +137,14 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
STsdbFileH *pFileH = pRepo->tsdbFileH;
for (int i = 0; i < pFileH->nFGroups; i++) {
// TODO
SFileGroup *pFGroup = pFileH->pFGroup + i;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbDestroyFile(pFGroup->files[type]);
}
}
}
/**
* Create the file group if the file group not exists.
*
* @return A pointer to
*/
SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
SFileGroup *tsdbCreateFGroupIfNeed(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return NULL;
SFileGroup fGroup;
......@@ -158,35 +166,15 @@ SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int max
return pGroup;
_err:
// TODO: deal with the err here
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]);
return NULL;
}
int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
SFileGroup *pGroup =
bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc);
if (pGroup == NULL) return -1;
// Remove from disk
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
remove(pGroup->files[type].fname);
}
// Adjust the memory
int filesBehind = pFileH->numOfFGroups - (((char *)pGroup - (char *)(pFileH->fGroup)) / sizeof(SFileGroup) + 1);
if (filesBehind > 0) {
memmove((void *)pGroup, (void *)((char *)pGroup + sizeof(SFileGroup)), sizeof(SFileGroup) * filesBehind);
}
pFileH->numOfFGroups--;
return 0;
}
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) {
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { // TODO
pIter->direction = direction;
pIter->base = pFileH->fGroup;
pIter->numOfFGroups = pFileH->numOfFGroups;
if (pFileH->numOfFGroups == 0){
if (pFileH->numOfFGroups == 0) {
pIter->pFileGroup = NULL;
} else {
if (direction == TSDB_FGROUP_ITER_FORWARD) {
......@@ -197,25 +185,13 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct
}
}
void tsdbFitRetention(STsdbRepo *pRepo) {
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = pFileH->fGroup;
int mfid =
tsdbGetKeyFileId(taosGetTimestamp(pRepo->config.precision), pRepo->config.daysPerFile, pRepo->config.precision) - pFileH->maxFGroups + 3;
while (pFileH->numOfFGroups > 0 && pGroup[0].fileId < mfid) {
tsdbRemoveFileGroup(pFileH, pGroup[0].fileId);
}
}
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { // TODO
if (pIter->numOfFGroups == 0) {
assert(pIter->pFileGroup == NULL);
return;
}
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
if (ptr == NULL) {
pIter->pFileGroup = NULL;
......@@ -224,7 +200,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
}
}
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {//TODO
SFileGroup *ret = pIter->pFileGroup;
if (ret == NULL) return NULL;
......@@ -264,20 +240,21 @@ void tsdbCloseFile(SFile *pFile) {
}
}
int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) {
int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
memset((void *)pFile, 0, sizeof(SFile));
pFile->fd = -1;
tsdbGetFileName(dataDir, fileId, suffix, pFile->fname);
pFile->fname = tsdbGetDataFileName(pRepo, fid, type);
if (pFile->fname == NULL) return -1;
if (access(pFile->fname, F_OK) == 0) {
// File already exists
return -1;
tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), fid);
terrno = TSDB_CODE_TDB_FILE_ALREADY_EXISTS;
goto _err;
}
if (tsdbOpenFile(pFile, O_RDWR | O_CREAT) < 0) {
// TODO: deal with the ERROR here
return -1;
goto _err;
}
pFile->info.size = TSDB_FILE_HEAD_SIZE;
......@@ -290,6 +267,9 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile)
tsdbCloseFile(pFile);
return 0;
_err:
return -1;
}
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
......@@ -299,18 +279,46 @@ SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
return (SFileGroup *)ptr;
}
void tsdbFitRetention(STsdbRepo *pRepo) {
STsdbCfg *pCfg = &(pRepo->config);
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = pFileH->pFGroup;
int mfid = TSDB_KEY_FILEID(taosGetTimestamp(pCfg->precision), pCfg->daysPerFile, pCfg->precision) -
TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile);
pthread_rwlock_wrlock(&(pFileH->fhlock));
while (pFileH->numOfFGroups > 0 && pGroup[0].fileId < mfid) {
tsdbRemoveFileGroup(pFileH, pGroup);
}
pthread_rwlock_unlock(&(pFileH->fhlock))
}
// ---------------- LOCAL FUNCTIONS ----------------
static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) {
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
uint32_t version;
char buf[512] = "\0";
char buf[512] = "\0";
pFile->fname = tsdbGetDataFileName(pRepo, fid, type);
if (pFile->fname == NULL) return -1;
tsdbGetFileName(dataDir, fid, suffix, pFile->fname);
if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1;
pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) return -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
if (tread(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1;
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1;
if (tread(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILE_HEAD_SIZE,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
tsdbError("vgId:%d file %s head part is corrupted", REPO_ID(pRepo), pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
void *pBuf = buf;
pBuf = taosDecodeFixedU32(pBuf, &version);
......@@ -319,21 +327,15 @@ static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile
tsdbCloseFile(pFile);
return 0;
_err:
tsdbDestroyFile(pFile);
return -1;
}
// static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) {
// if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0;
// SFileGroup fGroup = {0};
// fGroup.fileId = fid;
// for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
// if (tsdbInitFile(dataDir, fid, tsdbFileSuffix[type], &fGroup.files[type]) < 0) return -1;
// }
// pFileH->fGroup[pFileH->numOfFGroups++] = fGroup;
// qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup);
// return 0;
// }
static void tsdbDestroyFile(SFile *pFile) {
tsdbCloseFile(pFile);
tfree(pFile->fname);
}
static int compFGroup(const void *arg1, const void *arg2) {
int val1 = ((SFileGroup *)arg1)->fileId;
......@@ -356,4 +358,24 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) {
} else {
return fid > pFGroup->fileId ? 1 : -1;
}
}
static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
ASSERT(pFGroup != NULL);
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup fileGroup = *pFGroup;
int nFilesLeft = pFileH->nFGroups - (POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
if (nFilesLeft > 0) {
memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft);
}
pFileH->nFGroups--;
ASSERT(pFileH->nFGroups >= 0);
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
remove(fileGroup.files[type].fname);
tsdbDestroyFile(&fileGroup.files[type]);
}
}
\ No newline at end of file
......@@ -464,7 +464,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
return -1;
}
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) {
if ((pGroup = tsdbCreateFGroupIfNeed(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) {
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册