提交 f87c6f79 编写于 作者: H Hongze Cheng

more code

上级 e19a3eaf
......@@ -482,7 +482,7 @@ int tsdbOpenFile(SFile* pFile, int oflag);
void tsdbCloseFile(SFile* pFile);
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
void tsdbFitRetention(STsdbRepo* pRepo);
void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, int mfid);
int tsdbUpdateFileHeader(SFile* pFile);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
......@@ -490,6 +490,9 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days);
int tsdbGetBaseDirFromFile(char* fname, char* baseDir);
int tsdbApplyRetention(STsdbRepo* pRepo);
// ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
......
......@@ -33,7 +33,6 @@ static void tsdbDestroyFile(SFile *pFile);
static int compFGroup(const void *arg1, const void *arg2);
static int keyFGroupCompFunc(const void *key, const void *fgroup);
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep);
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days);
static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk);
static SHashObj *tsdbGetAllFids(STsdbRepo *pRepo, char *dirName);
static int tsdbRestoreFileGroup(STsdbRepo *pRepo, SDisk *pDisk, int fid, SFileGroup *pFileGroup);
......@@ -280,13 +279,10 @@ SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
return (SFileGroup *)ptr;
}
void tsdbFitRetention(STsdbRepo *pRepo) {
STsdbCfg *pCfg = &(pRepo->config);
void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, int mfid) {
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = pFileH->pFGroup;
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
pthread_rwlock_wrlock(&(pFileH->fhlock));
while (pFileH->nFGroups > 0 && pGroup[0].fileId < mfid) {
......@@ -347,8 +343,13 @@ void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
ASSERT(pFGroup != NULL);
STsdbFileH *pFileH = pRepo->tsdbFileH;
SDisk * pDisk = NULL;
char baseDir[TSDB_FILENAME_LEN] = "\0";
SFileGroup fileGroup = *pFGroup;
tsdbGetBaseDirFromFile(fileGroup.files[0].fname, baseDir);
pDisk = dnodeGetDiskByName(baseDir);
ASSERT(pDisk != NULL);
int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
if (nFilesLeft > 0) {
......@@ -364,6 +365,8 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
}
tsdbDestroyFile(&fileGroup.files[type]);
}
pDisk->dmeta.nfiles--;
}
int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
......@@ -421,6 +424,31 @@ _err:
*size = 0;
}
int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) {
return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision));
}
int tsdbGetBaseDirFromFile(char *fname, char *baseDir) {
char *fdup = strdup(fname);
if (fdup == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
for (size_t i = 0; i < 5; i++) {
dirname(fdup);
}
strncpy(baseDir, fdup, TSDB_FILENAME_LEN);
free(fdup);
return 0;
}
int tsdbApplyRetention(STsdbRepo *pRepo) {
// TODO
return 0;
}
// ---------------- LOCAL FUNCTIONS ----------------
static void tsdbDestroyFile(SFile *pFile) { tsdbCloseFile(pFile); }
......@@ -451,10 +479,6 @@ static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) {
return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]);
}
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) {
return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision));
}
static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) {
char tsdbDataDir[TSDB_FILENAME_LEN] = "\0";
char tsdbRootDir[TSDB_FILENAME_LEN] = "\0";
......
......@@ -32,6 +32,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY minKey);
// ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
......@@ -471,12 +472,18 @@ static void *tsdbCommitData(void *arg) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitIter *iters = NULL;
SRWHelper whelper = {0};
STsdbFileH * pFileH = pRepo->tsdbFileH;
TSKEY minKey = 0, maxKey = 0;
ASSERT(pRepo->commit == 1);
ASSERT(pMem != NULL);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, mfid, &minKey, &maxKey);
tsdbRemoveFilesBeyondRetention(pRepo, mfid);
// Create the iterator to read from cache
if (pMem->numOfRows > 0) {
iters = tsdbCreateCommitIters(pRepo);
......@@ -500,8 +507,12 @@ static void *tsdbCommitData(void *arg) {
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
tsdbSeekCommitIter(iters, pMem->maxTables, minKey);
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (fid < mfid) continue;
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _exit;
......@@ -509,14 +520,14 @@ static void *tsdbCommitData(void *arg) {
}
}
tsdbApplyRetention(pRepo);
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
tsdbFitRetention(pRepo);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
......@@ -611,6 +622,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
return 0;
}
if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) {
// file group not exists
}
// Create and open files for commit
dataDir = tsdbGetDataDirName(pRepo->rootDir);
if (dataDir == NULL) {
......@@ -780,4 +795,14 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
taosTFree(tData);
return 0;
}
static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key) {
for (int i = 0; i < nIters; i++) {
SCommitIter *pIter = pIters + i;
if (pIter->pTable == NULL) continue;
if (pIter->pIter == NULL) continue;
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key-1, INT32_MAX, NULL, NULL, 0);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册