Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
22b01362
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
22b01362
编写于
6月 20, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more work
上级
e0995ac1
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
80 addition
and
1126 deletion
+80
-1126
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+0
-1067
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+80
-59
未找到文件。
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
22b01362
...
...
@@ -48,1070 +48,3 @@ int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback) {
// TODO
return
code
;
}
#if 0
extern const char *TSDB_LEVEL_DNAME[];
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
static const char *tsdbTxnFname[] = {"current.t", "current"};
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
#define TSDB_MAX_INIT_FSETS (365000)
static int tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdb *pRepo);
static int tsdbScanAndTryFixFS(STsdb *pRepo);
static int tsdbScanRootDir(STsdb *pRepo);
static int tsdbScanDataDir(STsdb *pRepo);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf);
static int tsdbRestoreCurrent(STsdb *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
// static int tsdbProcessExpiredFS(STsdb *pRepo);
// static int tsdbCreateMeta(STsdb *pRepo);
static void tsdbGetRootDir(int repoid, const char *dir, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", repoid, dir);
}
static void tsdbGetDataDir(int repoid, const char *dir, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data", repoid, dir);
}
// For backward compatibility
// ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
int tlen = 0;
tlen += taosEncodeFixedU32(buf, pHeader->version);
tlen += taosEncodeFixedU32(buf, pHeader->len);
return tlen;
}
static void *tsdbDecodeFSHeader(void *buf, SFSHeader *pHeader) {
buf = taosDecodeFixedU32(buf, &(pHeader->version));
buf = taosDecodeFixedU32(buf, &(pHeader->len));
return buf;
}
// ================== STsdbFSMeta
static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
int tlen = 0;
tlen += taosEncodeFixedU32(buf, pMeta->version);
tlen += taosEncodeFixedI64(buf, pMeta->totalPoints);
tlen += taosEncodeFixedI64(buf, pMeta->totalStorage);
return tlen;
}
static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
buf = taosDecodeFixedU32(buf, &(pMeta->version));
buf = taosDecodeFixedI64(buf, &(pMeta->totalPoints));
buf = taosDecodeFixedI64(buf, &(pMeta->totalStorage));
return buf;
}
// ================== SFSStatus
static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
int tlen = 0;
uint64_t nset = taosArrayGetSize(pArray);
tlen += taosEncodeFixedU64(buf, nset);
for (size_t i = 0; i < nset; i++) {
SDFileSet *pSet = taosArrayGet(pArray, i);
tlen += tsdbEncodeDFileSet(buf, pSet);
}
return tlen;
}
static void *tsdbDecodeDFileSetArray(STsdb *pRepo, void *buf, SArray *pArray) {
uint64_t nset = 0;
taosArrayClear(pArray);
buf = taosDecodeFixedU64(buf, &nset);
for (size_t i = 0; i < nset; i++) {
SDFileSet dset = {0};
buf = tsdbDecodeDFileSet(pRepo, buf, &dset);
taosArrayPush(pArray, (void *)(&dset));
}
return buf;
}
static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
// ASSERT(pStatus->pmf);
int tlen = 0;
// tlen += tsdbEncodeSMFile(buf, pStatus->pmf);
tlen += tsdbEncodeDFileSetArray(buf, pStatus->df);
return tlen;
}
static void *tsdbDecodeFSStatus(STsdb *pRepo, void *buf, SFSStatus *pStatus) {
tsdbResetFSStatus(pStatus);
// pStatus->pmf = &(pStatus->mf);
// buf = tsdbDecodeSMFile(buf, pStatus->pmf);
buf = tsdbDecodeDFileSetArray(pRepo, buf, pStatus->df);
return buf;
}
static SFSStatus *tsdbNewFSStatus(int maxFSet) {
SFSStatus *pStatus = (SFSStatus *)taosMemoryCalloc(1, sizeof(*pStatus));
if (pStatus == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
// TSDB_FILE_SET_CLOSED(&(pStatus->mf));
pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet));
if (pStatus->df == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
taosMemoryFree(pStatus);
return NULL;
}
return pStatus;
}
static SFSStatus *tsdbFreeFSStatus(SFSStatus *pStatus) {
if (pStatus) {
pStatus->df = taosArrayDestroy(pStatus->df);
taosMemoryFree(pStatus);
}
return NULL;
}
static void tsdbResetFSStatus(SFSStatus *pStatus) {
if (pStatus == NULL) {
return;
}
// TSDB_FILE_SET_CLOSED(&(pStatus->mf));
// pStatus->pmf = NULL;
taosArrayClear(pStatus->df);
}
// static void tsdbSetStatusMFile(SFSStatus *pStatus, const SMFile *pMFile) {
// ASSERT(pStatus->pmf == NULL);
// pStatus->pmf = &(pStatus->mf);
// tsdbInitMFileEx(pStatus->pmf, (SMFile *)pMFile);
// }
static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
if (taosArrayPush(pStatus->df, (void *)pSet) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
TSDB_FSET_SET_CLOSED(((SDFileSet *)taosArrayGetLast(pStatus->df)));
return 0;
}
// ================== STsdbFS
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) {
int keep = pCfg->keep2;
int days = pCfg->days;
int maxFSet = TSDB_MAX_FSETS(keep, days);
STsdbFS *pfs;
pfs = (STsdbFS *)taosMemoryCalloc(1, sizeof(*pfs));
if (pfs == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
int code = taosThreadRwlockInit(&(pfs->lock), NULL);
if (code) {
terrno = TAOS_SYSTEM_ERROR(code);
taosMemoryFree(pfs);
return NULL;
}
if (maxFSet > TSDB_MAX_INIT_FSETS) {
maxFSet = TSDB_MAX_INIT_FSETS;
}
pfs->cstatus = tsdbNewFSStatus(maxFSet);
if (pfs->cstatus == NULL) {
tsdbFreeFS(pfs);
return NULL;
}
pfs->intxn = false;
pfs->nstatus = tsdbNewFSStatus(maxFSet);
if (pfs->nstatus == NULL) {
tsdbFreeFS(pfs);
return NULL;
}
return pfs;
}
void *tsdbFreeFS(STsdbFS *pfs) {
if (pfs) {
pfs->nstatus = tsdbFreeFSStatus(pfs->nstatus);
pfs->cstatus = tsdbFreeFSStatus(pfs->cstatus);
taosThreadRwlockDestroy(&(pfs->lock));
taosMemoryFree(pfs);
}
return NULL;
}
int tsdbOpenFS(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
char current[TSDB_FILENAME_LEN] = "\0";
int nExpired = 0;
ASSERT(pfs != NULL);
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
tsdbGetRtnSnap(pRepo, &pRepo->rtn);
if (taosCheckExistFile(current)) {
if (tsdbOpenFSFromCurrent(pRepo) < 0) {
tsdbError("vgId:%d, failed to open FS since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
tsdbScanAndTryFixDFilesHeader(pRepo, &nExpired);
// if (nExpired > 0) {
// tsdbProcessExpiredFS(pRepo);
// }
} else {
// should skip expired fileset inside of the function
if (tsdbRestoreCurrent(pRepo) < 0) {
tsdbError("vgId:%d, failed to restore current file since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
if (tsdbScanAndTryFixFS(pRepo) < 0) {
tsdbError("vgId:%d, failed to scan and fix FS since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
// // Load meta cache if has meta file
// if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) {
// tsdbError("vgId:%d, failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno));
// return -1;
// }
return 0;
}
void tsdbCloseFS(STsdb *pRepo) {
// Do nothing
}
// Start a new transaction to modify the file system
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) {
STsdbFS *pfs = REPO_FS(pRepo);
ASSERT(pfs->intxn == false);
pfs->intxn = true;
tsdbResetFSStatus(pfs->nstatus);
pfs->nstatus->meta = pfs->cstatus->meta;
// if (pfs->cstatus->pmf == NULL) {
pfs->nstatus->meta.version += 1;
// } else {
// pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1;
// }
pfs->nstatus->meta.totalPoints = pfs->cstatus->meta.totalPoints + pointsAdd;
pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd;
}
int tsdbEndFSTxn(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
ASSERT(FS_IN_TXN(pfs));
SFSStatus *pStatus;
// Write current file system snapshot
if (tsdbSaveFSStatus(pRepo, pfs->nstatus) < 0) {
tsdbEndFSTxnWithError(pfs);
return -1;
}
// Make new
tsdbWLockFS(pfs);
pStatus = pfs->cstatus;
pfs->cstatus = pfs->nstatus;
pfs->nstatus = pStatus;
tsdbUnLockFS(pfs);
// Apply actual change to each file and SDFileSet
tsdbApplyFSTxnOnDisk(pfs->nstatus, pfs->cstatus);
pfs->intxn = false;
return 0;
}
int tsdbEndFSTxnWithError(STsdbFS *pfs) {
tsdbApplyFSTxnOnDisk(pfs->nstatus, pfs->cstatus);
// TODO: if mf change, reload pfs->metaCache
pfs->intxn = false;
return 0;
}
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
SFSHeader fsheader;
void *pBuf = NULL;
void *ptr;
char hbuf[TSDB_FILE_HEAD_SIZE] = "\0";
char tfname[TSDB_FILENAME_LEN] = "\0";
char cfname[TSDB_FILENAME_LEN] = "\0";
tsdbGetTxnFname(pRepo, TSDB_TXN_TEMP_FILE, tfname);
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, cfname);
TdFilePtr pFile = taosOpenFile(tfname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
fsheader.version = TSDB_LATEST_SFS_VER;
if (taosArrayGetSize(pStatus->df) == 0) {
fsheader.len = 0;
} else {
fsheader.len = tsdbEncodeFSStatus(NULL, pStatus) + sizeof(TSCKSUM);
}
// Encode header part and write
ptr = hbuf;
tsdbEncodeFSHeader(&ptr, &fsheader);
tsdbEncodeFSMeta(&ptr, &(pStatus->meta));
taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE);
if (taosWriteFile(pFile, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFile);
taosRemoveFile(tfname);
return -1;
}
// Encode file status and write to file
if (fsheader.len > 0) {
if (tsdbMakeRoom(&(pBuf), fsheader.len) < 0) {
taosCloseFile(&pFile);
taosRemoveFile(tfname);
return -1;
}
ptr = pBuf;
tsdbEncodeFSStatus(&ptr, pStatus);
taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len);
if (taosWriteFile(pFile, pBuf, fsheader.len) < fsheader.len) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFile);
(void)taosRemoveFile(tfname);
taosTZfree(pBuf);
return -1;
}
}
// fsync, close and rename
if (taosFsyncFile(pFile) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFile);
taosRemoveFile(tfname);
taosTZfree(pBuf);
return -1;
}
(void)taosCloseFile(&pFile);
(void)taosRenameFile(tfname, cfname);
taosTZfree(pBuf);
return 0;
}
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo) {
int ifrom = 0;
int ito = 0;
size_t sizeFrom, sizeTo;
SDFileSet *pSetFrom;
SDFileSet *pSetTo;
sizeFrom = taosArrayGetSize(pFrom->df);
sizeTo = taosArrayGetSize(pTo->df);
// Apply meta file change
// (void)tsdbApplyMFileChange(pFrom->pmf, pTo->pmf);
// Apply SDFileSet change
if (ifrom >= sizeFrom) {
pSetFrom = NULL;
} else {
pSetFrom = taosArrayGet(pFrom->df, ifrom);
}
if (ito >= sizeTo) {
pSetTo = NULL;
} else {
pSetTo = taosArrayGet(pTo->df, ito);
}
while (true) {
if ((pSetTo == NULL) && (pSetFrom == NULL)) break;
if (pSetTo == NULL || (pSetFrom && pSetFrom->fid < pSetTo->fid)) {
tsdbApplyDFileSetChange(pSetFrom, NULL);
ifrom++;
if (ifrom >= sizeFrom) {
pSetFrom = NULL;
} else {
pSetFrom = taosArrayGet(pFrom->df, ifrom);
}
} else if (pSetFrom == NULL || pSetFrom->fid > pSetTo->fid) {
// Do nothing
ito++;
if (ito >= sizeTo) {
pSetTo = NULL;
} else {
pSetTo = taosArrayGet(pTo->df, ito);
}
} else {
tsdbApplyDFileSetChange(pSetFrom, pSetTo);
ifrom++;
if (ifrom >= sizeFrom) {
pSetFrom = NULL;
} else {
pSetFrom = taosArrayGet(pFrom->df, ifrom);
}
ito++;
if (ito >= sizeTo) {
pSetTo = NULL;
} else {
pSetTo = taosArrayGet(pTo->df, ito);
}
}
}
}
// ================== SFSIter
// ASSUMPTIONS: the FS Should be read locked when calling these functions
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction) {
pIter->pfs = pfs;
pIter->direction = direction;
size_t size = taosArrayGetSize(pfs->cstatus->df);
pIter->version = pfs->cstatus->meta.version;
if (size == 0) {
pIter->index = -1;
pIter->fid = TSDB_IVLD_FID;
} else {
if (direction == TSDB_FS_ITER_FORWARD) {
pIter->index = 0;
} else {
pIter->index = (int)(size - 1);
}
pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid;
}
}
void tsdbFSIterSeek(SFSIter *pIter, int fid) {
STsdbFS *pfs = pIter->pfs;
size_t size = taosArrayGetSize(pfs->cstatus->df);
int flags;
if (pIter->direction == TSDB_FS_ITER_FORWARD) {
flags = TD_GE;
} else {
flags = TD_LE;
}
void *ptr = taosbsearch(&fid, pfs->cstatus->df->pData, size, sizeof(SDFileSet), tsdbComparFidFSet, flags);
if (ptr == NULL) {
pIter->index = -1;
pIter->fid = TSDB_IVLD_FID;
} else {
pIter->index = (int)(TARRAY_ELEM_IDX(pfs->cstatus->df, ptr));
pIter->fid = ((SDFileSet *)ptr)->fid;
}
}
SDFileSet *tsdbFSIterNext(SFSIter *pIter) {
STsdbFS *pfs = pIter->pfs;
SDFileSet *pSet;
if (pIter->index < 0) {
ASSERT(pIter->fid == TSDB_IVLD_FID);
return NULL;
}
ASSERT(pIter->fid != TSDB_IVLD_FID);
if (pIter->version != pfs->cstatus->meta.version) {
pIter->version = pfs->cstatus->meta.version;
tsdbFSIterSeek(pIter, pIter->fid);
}
if (pIter->index < 0) {
return NULL;
}
pSet = (SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index);
ASSERT(pSet->fid == pIter->fid);
if (pIter->direction == TSDB_FS_ITER_FORWARD) {
pIter->index++;
if (pIter->index >= taosArrayGetSize(pfs->cstatus->df)) {
pIter->index = -1;
}
} else {
pIter->index--;
}
if (pIter->index >= 0) {
pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid;
} else {
pIter->fid = TSDB_IVLD_FID;
}
return pSet;
}
static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
int fid = *(int *)arg1;
SDFileSet *pSet = (SDFileSet *)arg2;
if (fid < pSet->fid) {
return -1;
} else if (fid == pSet->fid) {
return 0;
} else {
return 1;
}
}
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/%s/%s", tfsGetPrimaryPath(REPO_TFS(pRepo)), REPO_ID(pRepo),
pRepo->dir, tsdbTxnFname[ftype]);
}
static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
TdFilePtr pFile = NULL;
void *buffer = NULL;
SFSHeader fsheader;
char current[TSDB_FILENAME_LEN] = "\0";
void *ptr;
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
// current file exists, try to recover
pFile = taosOpenFile(current, TD_FILE_READ);
if (pFile == NULL) {
tsdbError("vgId:%d, failed to open file %s since %s", REPO_ID(pRepo), current, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (tsdbMakeRoom(&buffer, TSDB_FILE_HEAD_SIZE) < 0) {
goto _err;
}
int nread = (int)taosReadFile(pFile, buffer, TSDB_FILE_HEAD_SIZE);
if (nread < 0) {
tsdbError("vgId:%d, failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILENAME_LEN, current,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (nread < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d, failed to read header of file %s, read bytes:%d", REPO_ID(pRepo), current, nread);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)buffer, TSDB_FILE_HEAD_SIZE)) {
tsdbError("vgId:%d, header of file %s failed checksum check", REPO_ID(pRepo), current);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
SFSStatus *pStatus = pfs->cstatus;
ptr = buffer;
ptr = tsdbDecodeFSHeader(ptr, &fsheader);
ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta));
if (fsheader.version != TSDB_LATEST_SFS_VER) {
// TODO: handle file version change
}
if (fsheader.len > 0) {
if (tsdbMakeRoom(&buffer, fsheader.len) < 0) {
goto _err;
}
nread = (int)taosReadFile(pFile, buffer, fsheader.len);
if (nread < 0) {
tsdbError("vgId:%d, failed to read file %s since %s", REPO_ID(pRepo), current, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (nread < fsheader.len) {
tsdbError("vgId:%d, failed to read %d bytes from file %s", REPO_ID(pRepo), fsheader.len, current);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)buffer, fsheader.len)) {
tsdbError("vgId:%d, file %s is corrupted since wrong checksum", REPO_ID(pRepo), current);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
ptr = buffer;
ptr = tsdbDecodeFSStatus(pRepo, ptr, pStatus);
} else {
tsdbResetFSStatus(pStatus);
}
taosTZfree(buffer);
taosCloseFile(&pFile);
return 0;
_err:
if (pFile != NULL) {
taosCloseFile(&pFile);
}
taosTZfree(buffer);
return -1;
}
// Scan and try to fix incorrect files
static int tsdbScanAndTryFixFS(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus;
// if (tsdbScanAndTryFixMFile(pRepo) < 0) {
// tsdbError("vgId:%d, failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
// return -1;
// }
size_t size = taosArrayGetSize(pStatus->df);
for (size_t i = 0; i < size; i++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i);
if (tsdbScanAndTryFixDFileSet(pRepo, pSet) < 0) {
tsdbError("vgId:%d, failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
// remove those unused files
tsdbScanRootDir(pRepo);
tsdbScanDataDir(pRepo);
return 0;
}
static int tsdbScanRootDir(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS *pfs = REPO_FS(pRepo);
const STfsFile *pf;
tsdbGetRootDir(REPO_ID(pRepo), pRepo->dir, rootDir);
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), rootDir);
if (tdir == NULL) {
tsdbError("vgId:%d, failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
return -1;
}
while ((pf = tfsReaddir(tdir))) {
tfsBasename(pf, bname);
if (strcmp(bname, tsdbTxnFname[TSDB_TXN_CURR_FILE]) == 0 || strcmp(bname, "data") == 0) {
// Skip current file and data directory
continue;
}
// if (/*pfs->cstatus->pmf && */ tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) {
// continue;
// }
(void)tfsRemoveFile(pf);
tsdbDebug("vgId:%d, invalid file %s is removed", REPO_ID(pRepo), pf->aname);
}
tfsClosedir(tdir);
return 0;
}
static int tsdbScanDataDir(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS *pfs = REPO_FS(pRepo);
const STfsFile *pf;
tsdbGetDataDir(REPO_ID(pRepo), pRepo->dir, dataDir);
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), dataDir);
if (tdir == NULL) {
tsdbError("vgId:%d, failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
return -1;
}
while ((pf = tfsReaddir(tdir))) {
tfsBasename(pf, bname);
if (!tsdbIsTFileInFS(pfs, pf)) {
(void)tfsRemoveFile(pf);
tsdbDebug("vgId:%d, invalid file %s is removed", REPO_ID(pRepo), pf->aname);
}
}
tfsClosedir(tdir);
return 0;
}
static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
SFSIter fsiter;
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
SDFileSet *pSet;
while ((pSet = tsdbFSIterNext(&fsiter))) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
if (tfsIsSameFile(pf, TSDB_FILE_F(pDFile))) {
return true;
}
}
}
return false;
}
static int tsdbRestoreDFileSet(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STfsDir *tdir = NULL;
const STfsFile *pf = NULL;
const char *pattern = "^v[0-9]+f[0-9]+\\.(head|data|last|smad|smal)(-ver[0-9]+)?$";
SArray *fArray = NULL;
regex_t regex;
STsdbFS *pfs = REPO_FS(pRepo);
tsdbGetDataDir(REPO_ID(pRepo), pRepo->dir, dataDir);
// Resource allocation and init
regcomp(®ex, pattern, REG_EXTENDED);
fArray = taosArrayInit(1024, sizeof(STfsFile));
if (fArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d, failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
regfree(®ex);
return -1;
}
tdir = tfsOpendir(REPO_TFS(pRepo), dataDir);
if (tdir == NULL) {
tsdbError("vgId:%d, failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
taosArrayDestroy(fArray);
regfree(®ex);
return -1;
}
while ((pf = tfsReaddir(tdir))) {
tfsBasename(pf, bname);
int code = regexec(®ex, bname, 0, NULL, 0);
if (code == 0) {
if (taosArrayPush(fArray, (void *)pf) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tfsClosedir(tdir);
taosArrayDestroy(fArray);
regfree(®ex);
return -1;
}
} else if (code == REG_NOMATCH) {
// Not match
tsdbInfo("vgId:%d, invalid file %s exists, remove it", REPO_ID(pRepo), pf->aname);
(void)tfsRemoveFile(pf);
continue;
} else {
// Has other error
tsdbError("vgId:%d, failed to restore DFileSet Array while run regexec since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
tfsClosedir(tdir);
taosArrayDestroy(fArray);
regfree(®ex);
return -1;
}
}
tfsClosedir(tdir);
regfree(®ex);
// Sort the array according to file name
taosArraySort(fArray, tsdbComparTFILE);
size_t index = 0;
// Loop to recover each file set
for (;;) {
if (index >= taosArrayGetSize(fArray)) {
break;
}
SDFileSet fset = {0};
TSDB_FSET_SET_CLOSED(&fset);
// Loop to recover ONE fset
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype);
if (index >= taosArrayGetSize(fArray)) {
tsdbError("vgId:%d, incomplete DFileSet, fid:%d", REPO_ID(pRepo), fset.fid);
taosArrayDestroy(fArray);
return -1;
}
pf = taosArrayGet(fArray, index);
int tvid, tfid;
TSDB_FILE_T ttype;
uint32_t tversion;
char _bname[TSDB_FILENAME_LEN];
tfsBasename(pf, _bname);
tsdbParseDFilename(_bname, &tvid, &tfid, &ttype, &tversion);
ASSERT(tvid == REPO_ID(pRepo));
if (tfid < pRepo->rtn.minFid) { // skip file expired
++index;
continue;
}
if (ftype == 0) {
fset.fid = tfid;
} else {
if (tfid != fset.fid) {
tsdbError("vgId:%d, incomplete dFileSet, fid:%d", REPO_ID(pRepo), fset.fid);
taosArrayDestroy(fArray);
return -1;
}
}
if (ttype != ftype) {
tsdbError("vgId:%d, incomplete dFileSet, fid:%d", REPO_ID(pRepo), fset.fid);
taosArrayDestroy(fArray);
return -1;
}
pDFile->f = *pf;
// if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) {
if (tsdbOpenDFile(pDFile, TD_FILE_READ) < 0) {
tsdbError("vgId:%d, failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
tstrerror(terrno));
taosArrayDestroy(fArray);
return -1;
}
if (tsdbLoadDFileHeader(pDFile, &(pDFile->info)) < 0) {
tsdbError("vgId:%d, failed to load DFile %s header since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
tstrerror(terrno));
taosArrayDestroy(fArray);
return -1;
}
if (tsdbForceKeepFile) {
int64_t file_size;
// Get real file size
if (taosFStatFile(pDFile->pFile, &file_size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosArrayDestroy(fArray);
return -1;
}
if (pDFile->info.size != file_size) {
int64_t tfsize = pDFile->info.size;
pDFile->info.size = file_size;
tsdbInfo("vgId:%d, file %s header size is changed from %" PRId64 " to %" PRId64, REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pDFile), tfsize, pDFile->info.size);
}
}
tsdbCloseDFile(pDFile);
index++;
}
tsdbInfo("vgId:%d, FSET %d is restored", REPO_ID(pRepo), fset.fid);
taosArrayPush(pfs->cstatus->df, &fset);
}
// Resource release
taosArrayDestroy(fArray);
return 0;
}
static int tsdbRestoreCurrent(STsdb *pRepo) {
if (tsdbRestoreDFileSet(pRepo) < 0) {
tsdbError("vgId:%d, failed to restore DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) {
tsdbError("vgId:%d, failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
return 0;
}
static int tsdbComparTFILE(const void *arg1, const void *arg2) {
STfsFile *pf1 = (STfsFile *)arg1;
STfsFile *pf2 = (STfsFile *)arg2;
int vid1, fid1, vid2, fid2;
TSDB_FILE_T ftype1, ftype2;
uint32_t version1, version2;
char bname1[TSDB_FILENAME_LEN];
char bname2[TSDB_FILENAME_LEN];
tfsBasename(pf1, bname1);
tfsBasename(pf2, bname2);
tsdbParseDFilename(bname1, &vid1, &fid1, &ftype1, &version1);
tsdbParseDFilename(bname2, &vid2, &fid2, &ftype2, &version2);
if (fid1 < fid2) {
return -1;
} else if (fid1 > fid2) {
return 1;
} else {
if (ftype1 < ftype2) {
return -1;
} else if (ftype1 > ftype2) {
return 1;
} else {
return 0;
}
}
}
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) {
STsdbFS *pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus;
SDFInfo info;
for (size_t i = 0; i < taosArrayGetSize(pStatus->df); i++) {
SDFileSet fset;
tsdbInitDFileSetEx(&fset, (SDFileSet *)taosArrayGet(pStatus->df, i));
if (fset.fid < pRepo->rtn.minFid) {
++*nExpired;
}
tsdbDebug("vgId:%d, scan DFileSet %d header", REPO_ID(pRepo), fset.fid);
// if (tsdbOpenDFileSet(&fset, O_RDWR) < 0) {
if (tsdbOpenDFileSet(&fset, TD_FILE_WRITE | TD_FILE_READ) < 0) {
tsdbError("vgId:%d, failed to open DFileSet %d since %s, continue", REPO_ID(pRepo), fset.fid, tstrerror(terrno));
continue;
}
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype);
if ((tsdbLoadDFileHeader(pDFile, &info) < 0) || pDFile->info.size != info.size ||
pDFile->info.magic != info.magic) {
if (tsdbUpdateDFileHeader(pDFile) < 0) {
tsdbError("vgId:%d, failed to update DFile header of %s since %s, continue", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno));
} else {
tsdbInfo("vgId:%d, DFile header of %s is updated", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile));
TSDB_FILE_FSYNC(pDFile);
}
} else {
tsdbDebug("vgId:%d, DFile header of %s is correct", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile));
}
}
tsdbCloseDFileSet(&fset);
}
}
int tsdbRLockFS(STsdbFS *pFs) {
int code = taosThreadRwlockRdlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
int tsdbWLockFS(STsdbFS *pFs) {
int code = taosThreadRwlockWrlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
int tsdbUnLockFS(STsdbFS *pFs) {
int code = taosThreadRwlockUnlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep2 * tsTickPerMin[pCfg->precision];
midKey = now - pCfg->keep1 * tsTickPerMin[pCfg->precision];
maxKey = now - pCfg->keep0 * tsTickPerMin[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision));
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision));
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision));
tsdbDebug("vgId:%d, now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}
#endif
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
22b01362
...
...
@@ -451,7 +451,60 @@ TSDBKEY tsdbRowKey(TSDBROW *pRow) {
}
void
tsdbRowGetColVal
(
TSDBROW
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
)
{
// TODO
STColumn
*
pTColumn
=
&
pTSchema
->
columns
[
iCol
];
SValue
value
;
if
(
pRow
->
type
==
0
)
{
// get from row (todo);
}
else
if
(
pRow
->
type
==
1
)
{
SColData
*
pColData
;
void
*
p
;
p
=
taosbsearch
(
&
(
SColData
){.
cid
=
pTColumn
->
colId
},
pRow
->
pBlockData
->
aColData
,
pRow
->
pBlockData
->
nCol
,
sizeof
(
SBlockCol
),
tColDataCmprFn
,
TD_EQ
);
if
(
p
)
{
pColData
=
(
SBlockCol
*
)
p
;
ASSERT
(
pColData
->
flags
);
if
(
pColData
->
flags
==
HAS_NONE
)
{
goto
_return_none
;
}
else
if
(
pColData
->
flags
==
HAS_NULL
)
{
goto
_return_null
;
}
else
{
uint8_t
v
=
GET_BIT2
(
pColData
->
pBitMap
,
pRow
->
iRow
);
if
(
v
==
0
)
{
goto
_return_none
;
}
else
if
(
v
==
1
)
{
goto
_return_null
;
}
else
{
int32_t
offset
;
if
(
IS_VAR_DATA_TYPE
(
pTColumn
->
type
))
{
// offset = ; (todo)
ASSERT
(
0
);
}
else
{
offset
=
tDataTypes
[
pTColumn
->
type
].
bytes
*
pRow
->
iRow
;
}
tGetValue
(
pColData
->
pData
+
offset
,
&
value
,
pTColumn
->
type
);
}
}
}
else
{
goto
_return_none
;
}
}
else
{
ASSERT
(
0
);
}
_return_none:
*
pColVal
=
COL_VAL_NONE
(
pTColumn
->
colId
);
return
;
_return_null:
*
pColVal
=
COL_VAL_NULL
(
pTColumn
->
colId
);
return
;
_return_value:
*
pColVal
=
COL_VAL_VALUE
(
pTColumn
->
colId
,
value
);
return
;
}
// delete skyline ======================================================
...
...
@@ -586,13 +639,21 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) {
// SBlockData ======================================================
static
int32_t
tsdbBlockDataAppendRow0
(
SBlockData
*
pBlockData
,
TSDBROW
*
pRow
,
STSchema
*
pTSchema
)
{
int32_t
code
=
0
;
// TODO
// aKey
// other cols
return
code
;
}
static
int32_t
tsdbBlockDataAppendRow1
(
SBlockData
*
pBlockData
,
TSDBROW
*
pRow
)
{
int32_t
code
=
0
;
// TODO
// aKey
// other cols
return
code
;
}
...
...
@@ -604,66 +665,15 @@ void tsdbBlockDataClear(SBlockData *pBlockData) {
}
int32_t
tsdbBlockDataAppendRow
(
SBlockData
*
pBlockData
,
TSDBROW
*
pRow
,
STSchema
*
pTSchema
)
{
int32_t
code
=
0
;
TSDBKEY
key
=
tsdbRowKey
(
pRow
);
int32_t
nRow
=
pBlockData
->
nRow
;
SColVal
colVal
;
SColVal
*
pColVal
=
&
colVal
;
STColumn
*
pTColumn
;
SColData
*
pColData
;
pBlockData
->
nRow
++
;
// aKey
if
(
nRow
>=
pBlockData
->
maxRow
)
{
if
(
pBlockData
->
maxRow
==
0
)
{
pBlockData
->
maxRow
=
1024
;
}
else
{
pBlockData
->
maxRow
=
pBlockData
->
maxRow
*
2
;
}
ASSERT
(
pBlockData
->
maxRow
>
pBlockData
->
nRow
);
// code = tsdbRealloc((uint8_t **)&pBlockData->aKey, sizeof(TSDBKEY) * pBlockData->maxRow);
if
(
code
)
goto
_err
;
}
// pBlockData->aKey[nRow] = key;
// other cols
int16_t
iColData
=
0
;
int16_t
iCol
=
1
;
while
(
iCol
<
pTSchema
->
numOfCols
)
{
pTColumn
=
&
pTSchema
->
columns
[
iCol
];
if
(
iColData
<
pBlockData
->
nCol
)
{
pColData
=
&
pBlockData
->
aColData
[
iColData
];
if
(
pColData
->
cid
<
pTColumn
->
colId
)
{
iColData
++
;
}
else
if
(
pColData
->
cid
>
pTColumn
->
colId
)
{
// add a new SColData
iCol
++
;
}
else
{
iCol
++
;
iColData
++
;
}
}
else
{
// add a new column data
}
int32_t
code
=
0
;
if
(
pColVal
->
isNone
)
{
// set bit
}
else
if
(
pColVal
->
isNull
)
{
// set bit
}
else
{
// set bit
// put data
}
if
(
pRow
->
type
==
0
)
{
code
=
tsdbBlockDataAppendRow0
(
pBlockData
,
pRow
,
pTSchema
);
}
else
if
(
pRow
->
type
==
1
)
{
code
=
tsdbBlockDataAppendRow1
(
pBlockData
,
pRow
);
}
return
code
;
_err:
tsdbError
(
"block data append row failed since %s"
,
tstrerror
(
code
));
return
code
;
}
void
tsdbBlockDataDestroy
(
SBlockData
*
pBlockData
)
{
...
...
@@ -674,3 +684,14 @@ void tsdbBlockDataDestroy(SBlockData *pBlockData) {
tsdbFree
(
pBlockData
->
aColData
[
iCol
].
pData
);
}
}
// SColData ========================================
int32_t
tColDataCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
if
(((
SColData
*
)
p1
)
->
cid
<
((
SColData
*
)
p2
)
->
cid
)
{
return
-
1
;
}
else
if
(((
SColData
*
)
p1
)
->
cid
>
((
SColData
*
)
p2
)
->
cid
)
{
return
1
;
}
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录