Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c96dcb7e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c96dcb7e
编写于
11月 23, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor
上级
3117ea56
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
32 addition
and
269 deletion
+32
-269
src/inc/tfs.h
src/inc/tfs.h
+1
-1
src/tfs/src/tfcntl.c
src/tfs/src/tfcntl.c
+1
-1
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+7
-7
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+23
-260
未找到文件。
src/inc/tfs.h
浏览文件 @
c96dcb7e
...
@@ -44,7 +44,7 @@ void tfsDirName(TFSFILE *pfile, char dest[]);
...
@@ -44,7 +44,7 @@ void tfsDirName(TFSFILE *pfile, char dest[]);
void
tfsBaseName
(
TFSFILE
*
pfile
,
char
dest
[]);
void
tfsBaseName
(
TFSFILE
*
pfile
,
char
dest
[]);
int
tfsopen
(
TFSFILE
*
pfile
,
int
flags
);
int
tfsopen
(
TFSFILE
*
pfile
,
int
flags
);
int
tfsclose
(
int
fd
);
int
tfsclose
(
int
fd
);
TFSFILE
*
tfsCreateFiles
(
int
level
,
int
nfile
,
...
);
TFSFILE
*
tfsCreateFiles
(
int
level
,
int
nfile
,
char
*
fnames
[]
);
int
tfsRemoveFiles
(
int
nfile
,
...);
int
tfsRemoveFiles
(
int
nfile
,
...);
SDiskID
tfsFileID
(
TFSFILE
*
pfile
);
SDiskID
tfsFileID
(
TFSFILE
*
pfile
);
...
...
src/tfs/src/tfcntl.c
浏览文件 @
c96dcb7e
...
@@ -130,7 +130,7 @@ int tfsclose(int fd) {
...
@@ -130,7 +130,7 @@ int tfsclose(int fd) {
return
0
;
return
0
;
}
}
TFSFILE
*
tfsCreateFiles
(
int
level
,
int
nfile
,
...
)
{
TFSFILE
*
tfsCreateFiles
(
int
level
,
int
nfile
,
char
*
fnames
[]
)
{
// TODO
// TODO
return
NULL
;
return
NULL
;
}
}
...
...
src/tsdb/src/tsdbCommit.c
浏览文件 @
c96dcb7e
...
@@ -17,7 +17,7 @@
...
@@ -17,7 +17,7 @@
static
int
tsdbCommitTSData
(
STsdbRepo
*
pRepo
);
static
int
tsdbCommitTSData
(
STsdbRepo
*
pRepo
);
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
);
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
);
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
,
int
eno
);
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
,
int
eno
);
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
bool
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
);
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
);
static
void
tsdbDestroyCommitIters
(
SCommitIter
*
iters
,
int
maxTables
);
static
void
tsdbDestroyCommitIters
(
SCommitIter
*
iters
,
int
maxTables
);
...
@@ -176,12 +176,12 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
...
@@ -176,12 +176,12 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
sem_post
(
&
(
pRepo
->
readyToCommit
));
sem_post
(
&
(
pRepo
->
readyToCommit
));
}
}
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
)
{
static
bool
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
)
{
for
(
int
i
=
0
;
i
<
nIters
;
i
++
)
{
for
(
int
i
=
0
;
i
<
nIters
;
i
++
)
{
TSKEY
nextKey
=
tsdbNextIterKey
((
iters
+
i
)
->
pIter
);
TSKEY
nextKey
=
tsdbNextIterKey
((
iters
+
i
)
->
pIter
);
if
(
nextKey
!=
TSDB_DATA_TIMESTAMP_NULL
&&
(
nextKey
>=
minKey
&&
nextKey
<=
maxKey
))
return
1
;
if
(
nextKey
!=
TSDB_DATA_TIMESTAMP_NULL
&&
(
nextKey
>=
minKey
&&
nextKey
<=
maxKey
))
return
true
;
}
}
return
0
;
return
false
;
}
}
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
)
{
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
)
{
...
@@ -190,13 +190,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
...
@@ -190,13 +190,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
SFileGroup
*
pGroup
=
NULL
;
SFileGroup
*
pGroup
=
NULL
;
SMemTable
*
pMem
=
pRepo
->
imem
;
SMemTable
*
pMem
=
pRepo
->
imem
;
bool
newLast
=
false
;
bool
newLast
=
false
;
TSKEY
minKey
=
0
;
TSKEY
maxKey
=
0
;
TSKEY
minKey
=
0
,
maxKey
=
0
;
tsdbGetFidKeyRange
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
tsdbGetFidKeyRange
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
// Check if there are data to commit to this file
// Check if there are data to commit to this file
int
hasDataToCommit
=
tsdbHasDataToCommit
(
iters
,
pMem
->
maxTables
,
minKey
,
maxKey
);
if
(
!
tsdbHasDataToCommit
(
iters
,
pMem
->
maxTables
,
minKey
,
maxKey
))
{
if
(
!
hasDataToCommit
)
{
tsdbDebug
(
"vgId:%d no data to commit to file %d"
,
REPO_ID
(
pRepo
),
fid
);
tsdbDebug
(
"vgId:%d no data to commit to file %d"
,
REPO_ID
(
pRepo
),
fid
);
return
0
;
return
0
;
}
}
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
c96dcb7e
...
@@ -138,25 +138,40 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
...
@@ -138,25 +138,40 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
}
}
SFileGroup
*
tsdbCreateFGroup
(
STsdbRepo
*
pRepo
,
int
fid
)
{
SFileGroup
*
tsdbCreateFGroup
(
STsdbRepo
*
pRepo
,
int
fid
)
{
// TODO
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
fGroup
=
{
0
};
SFileGroup
fGroup
=
{
0
};
char
fnames
[
TSDB_FILE_TYPE_MAX
][
TSDB_FILENAME_LEN
]
=
{
0
};
ASSERT
(
tsdbSearchFGroup
(
pFileH
,
fid
,
TD_EQ
)
==
NULL
);
ASSERT
(
tsdbSearchFGroup
(
pFileH
,
fid
,
TD_EQ
)
==
NULL
);
// TODO: think about if (level == 0) is correct
// Create files
SDisk
*
pDisk
=
tdAssignDisk
(
tsDnodeTier
,
0
);
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
pDisk
==
NULL
)
{
tsdbGetDataFileName
(
pRepo
->
rootDir
,
REPO_ID
(
pRepo
),
fid
,
type
,
fnames
[
type
]);
tsdbError
(
"vgId:%d failed to create file group %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
}
return
NULL
;
int
level
=
tsdbGetFidLevel
();
// TODO
TFSFILE
*
pfiles
=
tfsCreateFiles
(
level
,
TSDB_FILE_TYPE_MAX
,
fnames
);
if
(
pfiles
==
NULL
)
{
// TODO: deal the error
}
}
// Write file headers to file
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
int
fd
=
tfsopen
(
pfiles
+
type
,
O_RDONLY
);
if
(
fd
<
0
)
{
// TODO: deal the error
}
}
// Construct file group
fGroup
.
fileId
=
fid
;
fGroup
.
fileId
=
fid
;
fGroup
.
level
=
pDisk
->
level
;
fGroup
.
did
=
pDisk
->
did
;
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbCreateFile
(
&
(
fGroup
.
files
[
type
]),
pRepo
,
fid
,
type
,
pDisk
)
<
0
)
goto
_err
;
if
(
tsdbCreateFile
(
&
(
fGroup
.
files
[
type
]),
pRepo
,
fid
,
type
,
pDisk
)
<
0
)
goto
_err
;
}
}
// Register fgroup to the repo
pthread_rwlock_wrlock
(
&
pFileH
->
fhlock
);
pthread_rwlock_wrlock
(
&
pFileH
->
fhlock
);
pFileH
->
pFGroup
[
pFileH
->
nFGroups
++
]
=
fGroup
;
pFileH
->
pFGroup
[
pFileH
->
nFGroups
++
]
=
fGroup
;
qsort
((
void
*
)(
pFileH
->
pFGroup
),
pFileH
->
nFGroups
,
sizeof
(
SFileGroup
),
compFGroup
);
qsort
((
void
*
)(
pFileH
->
pFGroup
),
pFileH
->
nFGroups
,
sizeof
(
SFileGroup
),
compFGroup
);
...
@@ -540,256 +555,4 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) {
...
@@ -540,256 +555,4 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) {
}
else
{
}
else
{
return
fid
>
pFGroup
->
fileId
?
1
:
-
1
;
return
fid
>
pFGroup
->
fileId
?
1
:
-
1
;
}
}
}
}
\ No newline at end of file
// static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) {
// char tsdbDataDir[TSDB_FILENAME_LEN] = "\0";
// char tsdbRootDir[TSDB_FILENAME_LEN] = "\0";
// char fname[TSDB_FILENAME_LEN] = "\0";
// SHashObj * pFids = NULL;
// SHashMutableIterator *pIter = NULL;
// STsdbFileH * pFileH = pRepo->tsdbFileH;
// SFileGroup fgroup = {0};
// STsdbCfg * pCfg = &(pRepo->config);
// SFidGroup fidGroup = {0};
// int mfid = 0;
// tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir);
// tdGetTsdbDataDir(pDisk->dir, REPO_ID(pRepo), tsdbDataDir);
// pFids = tsdbGetAllFids(pRepo, tsdbDataDir);
// if (pFids == NULL) {
// goto _err;
// }
// pIter = taosHashCreateIter(pFids);
// if (pIter == NULL) {
// goto _err;
// }
// tsdbGetFidGroup(pCfg, &fidGroup);
// mfid = fidGroup.minFid;
// while (taosHashIterNext(pIter)) {
// int32_t fid = *(int32_t *)taosHashIterGet(pIter);
// if (fid < mfid) {
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, type, fname);
// (void)remove(fname);
// }
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NHEAD, fname);
// (void)remove(fname);
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NLAST, fname);
// (void)remove(fname);
// continue;
// }
// tsdbRestoreFileGroup(pRepo, pDisk, fid, &fgroup);
// pFileH->pFGroup[pFileH->nFGroups++] = fgroup;
// qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(fgroup), compFGroup);
// // TODO
// pDisk->dmeta.nfiles++;
// }
// taosHashDestroyIter(pIter);
// taosHashCleanup(pFids);
// return 0;
// _err:
// taosHashDestroyIter(pIter);
// taosHashCleanup(pFids);
// return -1;
// }
// static int tsdbRestoreFileGroup(STsdbRepo *pRepo, SDisk *pDisk, int fid, SFileGroup *pFileGroup) {
// char tsdbRootDir[TSDB_FILENAME_LEN] = "\0";
// char nheadF[TSDB_FILENAME_LEN] = "\0";
// char nlastF[TSDB_FILENAME_LEN] = "\0";
// bool newHeadExists = false;
// bool newLastExists = false;
// uint32_t version = 0;
// terrno = TSDB_CODE_SUCCESS;
// memset((void *)pFileGroup, 0, sizeof(*pFileGroup));
// pFileGroup->fileId = fid;
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
// SFile *pFile = pFileGroup->files + type;
// pFile->fd = -1;
// }
// tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir);
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
// SFile *pFile = pFileGroup->files + type;
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_HEAD, TSDB_FILE_NAME(pFile));
// if (access(TSDB_FILE_NAME(pFile), F_OK) != 0) {
// memset(&(pFile->info), 0, sizeof(pFile->info));
// pFile->info.magic = TSDB_FILE_INIT_MAGIC;
// pFileGroup->state = 1;
// terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
// }
// }
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NHEAD, nheadF);
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NLAST, nlastF);
// if (access(nheadF, F_OK) == 0) {
// newHeadExists = true;
// }
// if (access(nlastF, F_OK) == 0) {
// newLastExists = true;
// }
// if (newHeadExists) {
// (void)remove(nheadF);
// (void)remove(nlastF);
// } else {
// if (newLastExists) {
// (void)rename(nlastF, pFileGroup->files[TSDB_FILE_TYPE_LAST].fname);
// }
// }
// if (terrno != TSDB_CODE_SUCCESS) {
// return -1;
// }
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
// SFile *pFile = pFileGroup->files + type;
// if (tsdbOpenFile(pFile, O_RDONLY) < 0) {
// memset(&(pFile->info), 0, sizeof(pFile->info));
// pFile->info.magic = TSDB_FILE_INIT_MAGIC;
// pFileGroup->state = 1;
// terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
// continue;
// }
// if (tsdbLoadFileHeader(pFile, &version) < 0) {
// memset(&(pFile->info), 0, sizeof(pFile->info));
// pFile->info.magic = TSDB_FILE_INIT_MAGIC;
// pFileGroup->state = 1;
// terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
// tsdbCloseFile(pFile);
// continue;
// }
// if (version != TSDB_FILE_VERSION) {
// tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem",
// REPO_ID(pRepo), TSDB_FILE_NAME(pFile), version, TSDB_FILE_VERSION);
// }
// tsdbCloseFile(pFile);
// }
// if (terrno != TSDB_CODE_SUCCESS) {
// return -1;
// } else {
// return 0;
// }
// }
// static SHashObj *tsdbGetAllFids(STsdbRepo *pRepo, char *dirName) {
// DIR * dir = NULL;
// regex_t regex = {0};
// int code = 0;
// int32_t vid, fid;
// SHashObj *pHash = NULL;
// code = regcomp(®ex, "^v[0-9]+f[0-9]+\\.(head|data|last|h|d|l)$", REG_EXTENDED);
// if (code != 0) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _err;
// }
// dir = opendir(dirName);
// if (dir == NULL) {
// tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dirName, strerror(errno));
// terrno = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// }
// pHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// if (pHash == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _err;
// }
// struct dirent *dp = NULL;
// while ((dp = readdir(dir)) != NULL) {
// if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
// code = regexec(®ex, dp->d_name, 0, NULL, 0);
// if (code == 0) {
// sscanf(dp->d_name, "v%df%d", &vid, &fid);
// if (vid != REPO_ID(pRepo)) {
// tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name);
// continue;
// }
// taosHashPut(pHash, (void *)(&fid), sizeof(fid), (void *)(&fid), sizeof(fid));
// } else if (code == REG_NOMATCH) {
// tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name);
// continue;
// } else {
// goto _err;
// }
// }
// closedir(dir);
// regfree(®ex);
// return pHash;
// _err:
// taosHashCleanup(pHash);
// if (dir != NULL) closedir(dir);
// regfree(®ex);
// return NULL;
// }
// static int tsdbGetFidLevel(int fid, SFidGroup *pFidGroup) {
// if (fid >= pFidGroup->maxFid) {
// return 0;
// } else if (fid >= pFidGroup->midFid && fid < pFidGroup->maxFid) {
// return 1;
// } else {
// return 2;
// }
// }
// static int tsdbCreateVnodeDataDir(char *baseDir, int vid) {
// char dirName[TSDB_FILENAME_LEN] = "\0";
// char tsdbRootDir[TSDB_FILENAME_LEN] = "\0";
// tdGetVnodeRootDir(baseDir, dirName);
// if (taosMkDir(dirName, 0755) < 0 && errno != EEXIST) {
// terrno = TAOS_SYSTEM_ERROR(errno);
// return -1;
// }
// tdGetVnodeDir(baseDir, vid, dirName);
// if (taosMkDir(dirName, 0755) < 0 && errno != EEXIST) {
// terrno = TAOS_SYSTEM_ERROR(errno);
// return -1;
// }
// tdGetTsdbRootDir(baseDir, vid, tsdbRootDir);
// if (taosMkDir(tsdbRootDir, 0755) < 0 && errno != EEXIST) {
// terrno = TAOS_SYSTEM_ERROR(errno);
// return -1;
// }
// tdGetTsdbDataDir(baseDir, vid, dirName);
// if (taosMkDir(dirName, 0755) < 0 && errno != EEXIST) {
// terrno = TAOS_SYSTEM_ERROR(errno);
// return -1;
// }
// return 0;
// }
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录