Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
7d42764f
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7d42764f
编写于
12月 31, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact
上级
20499831
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
240 addition
and
228 deletion
+240
-228
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+215
-218
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+25
-10
未找到文件。
src/tsdb/inc/tsdbMain.h
浏览文件 @
7d42764f
...
...
@@ -32,6 +32,9 @@
extern
"C"
{
#endif
typedef
struct
STsdbRepo
STsdbRepo
;
// ================= tsdbLog.h
extern
int32_t
tsdbDebugFlag
;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
...
...
@@ -41,6 +44,7 @@ extern int32_t tsdbDebugFlag;
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
// ================= OTHERS
#define TSDB_MAX_TABLE_SCHEMAS 16
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
...
...
@@ -88,6 +92,94 @@ typedef struct {
int
maxCols
;
}
STsdbMeta
;
#define TSDB_INIT_NTABLES 1024
#define TABLE_TYPE(t) (t)->type
#define TABLE_NAME(t) (t)->name
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
#define TABLE_UID(t) (t)->tableId.uid
#define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
STsdbMeta
*
tsdbNewMeta
(
STsdbCfg
*
pCfg
);
void
tsdbFreeMeta
(
STsdbMeta
*
pMeta
);
int
tsdbOpenMeta
(
STsdbRepo
*
pRepo
);
int
tsdbCloseMeta
(
STsdbRepo
*
pRepo
);
STable
*
tsdbGetTableByUid
(
STsdbMeta
*
pMeta
,
uint64_t
uid
);
STSchema
*
tsdbGetTableSchemaByVersion
(
STable
*
pTable
,
int16_t
version
);
int
tsdbWLockRepoMeta
(
STsdbRepo
*
pRepo
);
int
tsdbRLockRepoMeta
(
STsdbRepo
*
pRepo
);
int
tsdbUnlockRepoMeta
(
STsdbRepo
*
pRepo
);
void
tsdbRefTable
(
STable
*
pTable
);
void
tsdbUnRefTable
(
STable
*
pTable
);
void
tsdbUpdateTableSchema
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
STSchema
*
pSchema
,
bool
insertAct
);
static
FORCE_INLINE
int
tsdbCompareSchemaVersion
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(
*
(
int16_t
*
)
key1
<
schemaVersion
(
*
(
STSchema
**
)
key2
))
{
return
-
1
;
}
else
if
(
*
(
int16_t
*
)
key1
>
schemaVersion
(
*
(
STSchema
**
)
key2
))
{
return
1
;
}
else
{
return
0
;
}
}
static
FORCE_INLINE
STSchema
*
tsdbGetTableSchemaImpl
(
STable
*
pTable
,
bool
lock
,
bool
copy
,
int16_t
version
)
{
STable
*
pDTable
=
(
TABLE_TYPE
(
pTable
)
==
TSDB_CHILD_TABLE
)
?
pTable
->
pSuper
:
pTable
;
STSchema
*
pSchema
=
NULL
;
STSchema
*
pTSchema
=
NULL
;
if
(
lock
)
TSDB_RLOCK_TABLE
(
pDTable
);
if
(
version
<
0
)
{
// get the latest version of schema
pTSchema
=
pDTable
->
schema
[
pDTable
->
numOfSchemas
-
1
];
}
else
{
// get the schema with version
void
*
ptr
=
taosbsearch
(
&
version
,
pDTable
->
schema
,
pDTable
->
numOfSchemas
,
sizeof
(
STSchema
*
),
tsdbCompareSchemaVersion
,
TD_EQ
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION
;
goto
_exit
;
}
pTSchema
=
*
(
STSchema
**
)
ptr
;
}
ASSERT
(
pTSchema
!=
NULL
);
if
(
copy
)
{
if
((
pSchema
=
tdDupSchema
(
pTSchema
))
==
NULL
)
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
}
else
{
pSchema
=
pTSchema
;
}
_exit:
if
(
lock
)
TSDB_RUNLOCK_TABLE
(
pDTable
);
return
pSchema
;
}
static
FORCE_INLINE
STSchema
*
tsdbGetTableSchema
(
STable
*
pTable
)
{
return
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
);
}
static
FORCE_INLINE
STSchema
*
tsdbGetTableTagSchema
(
STable
*
pTable
)
{
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
// check child table first
STable
*
pSuper
=
pTable
->
pSuper
;
if
(
pSuper
==
NULL
)
return
NULL
;
return
pSuper
->
tagSchema
;
}
else
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
return
pTable
->
tagSchema
;
}
else
{
return
NULL
;
}
}
static
FORCE_INLINE
TSKEY
tsdbGetTableLastKeyImpl
(
STable
*
pTable
)
{
ASSERT
(
pTable
->
lastRow
==
NULL
||
pTable
->
lastKey
==
dataRowKey
(
pTable
->
lastRow
));
return
pTable
->
lastKey
;
}
// ------------------ tsdbBuffer.c
typedef
struct
{
int64_t
blockId
;
...
...
@@ -105,7 +197,25 @@ typedef struct {
SList
*
bufBlockList
;
}
STsdbBufPool
;
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
STsdbBufPool
*
tsdbNewBufPool
();
void
tsdbFreeBufPool
(
STsdbBufPool
*
pBufPool
);
int
tsdbOpenBufPool
(
STsdbRepo
*
pRepo
);
void
tsdbCloseBufPool
(
STsdbRepo
*
pRepo
);
SListNode
*
tsdbAllocBufBlockFromPool
(
STsdbRepo
*
pRepo
);
// ------------------ tsdbMemTable.c
typedef
struct
{
int
rowsInserted
;
int
rowsUpdated
;
int
rowsDeleteSucceed
;
int
rowsDeleteFailed
;
int
nOperations
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
SMergeInfo
;
typedef
struct
{
STable
*
pTable
;
SSkipListIterator
*
pIter
;
...
...
@@ -152,6 +262,39 @@ typedef struct {
char
cont
[];
}
SActCont
;
int
tsdbRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbUnRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbTakeMemSnapshot
(
STsdbRepo
*
pRepo
,
SMemTable
**
pMem
,
SMemTable
**
pIMem
);
void
tsdbUnTakeMemSnapShot
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMem
,
SMemTable
*
pIMem
);
void
*
tsdbAllocBytes
(
STsdbRepo
*
pRepo
,
int
bytes
);
int
tsdbAsyncCommit
(
STsdbRepo
*
pRepo
);
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
);
static
FORCE_INLINE
SDataRow
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
if
(
pIter
==
NULL
)
return
NULL
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
return
NULL
;
return
(
SDataRow
)
SL_GET_NODE_DATA
(
node
);
}
static
FORCE_INLINE
TSKEY
tsdbNextIterKey
(
SSkipListIterator
*
pIter
)
{
SDataRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TSDB_DATA_TIMESTAMP_NULL
;
return
dataRowKey
(
row
);
}
static
FORCE_INLINE
TKEY
tsdbNextIterTKey
(
SSkipListIterator
*
pIter
)
{
SDataRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TKEY_NULL
;
return
dataRowTKey
(
row
);
}
// ------------------ tsdbFile.c
extern
const
char
*
tsdbFileSuffix
[];
...
...
@@ -217,6 +360,37 @@ typedef struct {
}
SFileGroupIter
;
#define TSDB_FILE_NAME(pFile) ((pFile)->file.aname)
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
#define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId
#define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId
#define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0)
#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFileH
*
tsdbNewFileH
(
STsdbCfg
*
pCfg
);
void
tsdbFreeFileH
(
STsdbFileH
*
pFileH
);
int
tsdbOpenFileH
(
STsdbRepo
*
pRepo
);
void
tsdbCloseFileH
(
STsdbRepo
*
pRepo
,
bool
isRestart
);
SFileGroup
*
tsdbCreateFGroup
(
STsdbRepo
*
pRepo
,
int
fid
,
int
level
);
void
tsdbInitFileGroupIter
(
STsdbFileH
*
pFileH
,
SFileGroupIter
*
pIter
,
int
direction
);
void
tsdbSeekFileGroupIter
(
SFileGroupIter
*
pIter
,
int
fid
);
SFileGroup
*
tsdbGetFileGroupNext
(
SFileGroupIter
*
pIter
);
int
tsdbOpenFile
(
SFile
*
pFile
,
int
oflag
);
void
tsdbCloseFile
(
SFile
*
pFile
);
int
tsdbCreateFile
(
SFile
*
pFile
,
STsdbRepo
*
pRepo
,
int
fid
,
int
type
);
SFileGroup
*
tsdbSearchFGroup
(
STsdbFileH
*
pFileH
,
int
fid
,
int
flags
);
int
tsdbGetFidLevel
(
int
fid
,
SFidGroup
fidg
);
void
tsdbRemoveFilesBeyondRetention
(
STsdbRepo
*
pRepo
,
SFidGroup
*
pFidGroup
);
int
tsdbUpdateFileHeader
(
SFile
*
pFile
);
int
tsdbEncodeSFileInfo
(
void
**
buf
,
const
STsdbFileInfo
*
pInfo
);
void
*
tsdbDecodeSFileInfo
(
void
*
buf
,
STsdbFileInfo
*
pInfo
);
void
tsdbRemoveFileGroup
(
STsdbRepo
*
pRepo
,
SFileGroup
*
pFGroup
);
int
tsdbLoadFileHeader
(
SFile
*
pFile
,
uint32_t
*
version
);
void
tsdbGetFileInfoImpl
(
char
*
fname
,
uint32_t
*
magic
,
int64_t
*
size
);
void
tsdbGetFidGroup
(
STsdbCfg
*
pCfg
,
SFidGroup
*
pFidGroup
);
void
tsdbGetFidKeyRange
(
int
daysPerFile
,
int8_t
precision
,
int
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
int
tsdbApplyRetention
(
STsdbRepo
*
pRepo
,
SFidGroup
*
pFidGroup
);
// ------------------ tsdbMain.c
typedef
struct
{
...
...
@@ -231,7 +405,7 @@ typedef struct {
void
*
pMsg
;
}
SSubmitMsgIter
;
typedef
struct
{
struct
STsdbRepo
{
int8_t
state
;
char
*
rootDir
;
...
...
@@ -247,7 +421,34 @@ typedef struct {
pthread_mutex_t
mutex
;
bool
repoLocked
;
int32_t
code
;
// Commit code
}
STsdbRepo
;
};
#define REPO_ID(r) (r)->config.tsdbId
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
char
*
tsdbGetMetaFileName
(
char
*
rootDir
);
void
tsdbGetDataFileName
(
char
*
rootDir
,
int
vid
,
int
fid
,
int
type
,
char
*
fname
);
int
tsdbLockRepo
(
STsdbRepo
*
pRepo
);
int
tsdbUnlockRepo
(
STsdbRepo
*
pRepo
);
char
*
tsdbGetDataDirName
(
char
*
rootDir
);
int
tsdbGetNextMaxTables
(
int
tid
);
STsdbMeta
*
tsdbGetMeta
(
TSDB_REPO_T
*
pRepo
);
STsdbFileH
*
tsdbGetFile
(
TSDB_REPO_T
*
pRepo
);
int
tsdbCheckCommit
(
STsdbRepo
*
pRepo
);
static
FORCE_INLINE
STsdbBufBlock
*
tsdbGetCurrBufBlock
(
STsdbRepo
*
pRepo
)
{
ASSERT
(
pRepo
!=
NULL
);
if
(
pRepo
->
mem
==
NULL
)
return
NULL
;
SListNode
*
pNode
=
listTail
(
pRepo
->
mem
->
bufBlockList
);
if
(
pNode
==
NULL
)
return
NULL
;
STsdbBufBlock
*
pBufBlock
=
NULL
;
tdListNodeGetData
(
pRepo
->
mem
->
bufBlockList
,
pNode
,
(
void
*
)(
&
pBufBlock
));
return
pBufBlock
;
}
// ------------------ tsdbRWHelper.c
typedef
struct
{
...
...
@@ -343,203 +544,24 @@ typedef struct {
void
*
compBuffer
;
// Buffer for temperary compress/decompress purpose
}
SRWHelper
;
typedef
struct
{
int
rowsInserted
;
int
rowsUpdated
;
int
rowsDeleteSucceed
;
int
rowsDeleteFailed
;
int
nOperations
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
SMergeInfo
;
// ------------------ tsdbScan.c
typedef
struct
{
SFileGroup
fGroup
;
int
numOfIdx
;
SFileGroup
fGroup
;
int
numOfIdx
;
SBlockIdx
*
pCompIdx
;
SBlockInfo
*
pCompInfo
;
void
*
pBuf
;
FILE
*
tLogStream
;
void
*
pBuf
;
FILE
*
tLogStream
;
}
STsdbScanHandle
;
// Operations
// ------------------ tsdbMeta.c
#define TSDB_INIT_NTABLES 1024
#define TABLE_TYPE(t) (t)->type
#define TABLE_NAME(t) (t)->name
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
#define TABLE_UID(t) (t)->tableId.uid
#define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
STsdbMeta
*
tsdbNewMeta
(
STsdbCfg
*
pCfg
);
void
tsdbFreeMeta
(
STsdbMeta
*
pMeta
);
int
tsdbOpenMeta
(
STsdbRepo
*
pRepo
);
int
tsdbCloseMeta
(
STsdbRepo
*
pRepo
);
STable
*
tsdbGetTableByUid
(
STsdbMeta
*
pMeta
,
uint64_t
uid
);
STSchema
*
tsdbGetTableSchemaByVersion
(
STable
*
pTable
,
int16_t
version
);
int
tsdbWLockRepoMeta
(
STsdbRepo
*
pRepo
);
int
tsdbRLockRepoMeta
(
STsdbRepo
*
pRepo
);
int
tsdbUnlockRepoMeta
(
STsdbRepo
*
pRepo
);
void
tsdbRefTable
(
STable
*
pTable
);
void
tsdbUnRefTable
(
STable
*
pTable
);
void
tsdbUpdateTableSchema
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
STSchema
*
pSchema
,
bool
insertAct
);
static
FORCE_INLINE
int
tsdbCompareSchemaVersion
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(
*
(
int16_t
*
)
key1
<
schemaVersion
(
*
(
STSchema
**
)
key2
))
{
return
-
1
;
}
else
if
(
*
(
int16_t
*
)
key1
>
schemaVersion
(
*
(
STSchema
**
)
key2
))
{
return
1
;
}
else
{
return
0
;
}
}
static
FORCE_INLINE
STSchema
*
tsdbGetTableSchemaImpl
(
STable
*
pTable
,
bool
lock
,
bool
copy
,
int16_t
version
)
{
STable
*
pDTable
=
(
TABLE_TYPE
(
pTable
)
==
TSDB_CHILD_TABLE
)
?
pTable
->
pSuper
:
pTable
;
STSchema
*
pSchema
=
NULL
;
STSchema
*
pTSchema
=
NULL
;
if
(
lock
)
TSDB_RLOCK_TABLE
(
pDTable
);
if
(
version
<
0
)
{
// get the latest version of schema
pTSchema
=
pDTable
->
schema
[
pDTable
->
numOfSchemas
-
1
];
}
else
{
// get the schema with version
void
*
ptr
=
taosbsearch
(
&
version
,
pDTable
->
schema
,
pDTable
->
numOfSchemas
,
sizeof
(
STSchema
*
),
tsdbCompareSchemaVersion
,
TD_EQ
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION
;
goto
_exit
;
}
pTSchema
=
*
(
STSchema
**
)
ptr
;
}
ASSERT
(
pTSchema
!=
NULL
);
if
(
copy
)
{
if
((
pSchema
=
tdDupSchema
(
pTSchema
))
==
NULL
)
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
}
else
{
pSchema
=
pTSchema
;
}
_exit:
if
(
lock
)
TSDB_RUNLOCK_TABLE
(
pDTable
);
return
pSchema
;
}
static
FORCE_INLINE
STSchema
*
tsdbGetTableSchema
(
STable
*
pTable
)
{
return
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
);
}
static
FORCE_INLINE
STSchema
*
tsdbGetTableTagSchema
(
STable
*
pTable
)
{
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
// check child table first
STable
*
pSuper
=
pTable
->
pSuper
;
if
(
pSuper
==
NULL
)
return
NULL
;
return
pSuper
->
tagSchema
;
}
else
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
return
pTable
->
tagSchema
;
}
else
{
return
NULL
;
}
}
static
FORCE_INLINE
TSKEY
tsdbGetTableLastKeyImpl
(
STable
*
pTable
)
{
ASSERT
(
pTable
->
lastRow
==
NULL
||
pTable
->
lastKey
==
dataRowKey
(
pTable
->
lastRow
));
return
pTable
->
lastKey
;
}
// ------------------ tsdbBuffer.c
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
STsdbBufPool
*
tsdbNewBufPool
();
void
tsdbFreeBufPool
(
STsdbBufPool
*
pBufPool
);
int
tsdbOpenBufPool
(
STsdbRepo
*
pRepo
);
void
tsdbCloseBufPool
(
STsdbRepo
*
pRepo
);
SListNode
*
tsdbAllocBufBlockFromPool
(
STsdbRepo
*
pRepo
);
// ------------------ tsdbMemTable.c
int
tsdbRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbUnRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbTakeMemSnapshot
(
STsdbRepo
*
pRepo
,
SMemTable
**
pMem
,
SMemTable
**
pIMem
);
void
tsdbUnTakeMemSnapShot
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMem
,
SMemTable
*
pIMem
);
void
*
tsdbAllocBytes
(
STsdbRepo
*
pRepo
,
int
bytes
);
int
tsdbAsyncCommit
(
STsdbRepo
*
pRepo
);
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
);
static
FORCE_INLINE
SDataRow
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
if
(
pIter
==
NULL
)
return
NULL
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
return
NULL
;
return
(
SDataRow
)
SL_GET_NODE_DATA
(
node
);
}
static
FORCE_INLINE
TSKEY
tsdbNextIterKey
(
SSkipListIterator
*
pIter
)
{
SDataRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TSDB_DATA_TIMESTAMP_NULL
;
return
dataRowKey
(
row
);
}
static
FORCE_INLINE
TKEY
tsdbNextIterTKey
(
SSkipListIterator
*
pIter
)
{
SDataRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TKEY_NULL
;
return
dataRowTKey
(
row
);
}
static
FORCE_INLINE
STsdbBufBlock
*
tsdbGetCurrBufBlock
(
STsdbRepo
*
pRepo
)
{
ASSERT
(
pRepo
!=
NULL
);
if
(
pRepo
->
mem
==
NULL
)
return
NULL
;
SListNode
*
pNode
=
listTail
(
pRepo
->
mem
->
bufBlockList
);
if
(
pNode
==
NULL
)
return
NULL
;
STsdbBufBlock
*
pBufBlock
=
NULL
;
tdListNodeGetData
(
pRepo
->
mem
->
bufBlockList
,
pNode
,
(
void
*
)(
&
pBufBlock
));
return
pBufBlock
;
}
// ------------------ tsdbFile.c
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
#define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId
#define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId
#define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0)
#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFileH
*
tsdbNewFileH
(
STsdbCfg
*
pCfg
);
void
tsdbFreeFileH
(
STsdbFileH
*
pFileH
);
int
tsdbOpenFileH
(
STsdbRepo
*
pRepo
);
void
tsdbCloseFileH
(
STsdbRepo
*
pRepo
,
bool
isRestart
);
SFileGroup
*
tsdbCreateFGroup
(
STsdbRepo
*
pRepo
,
int
fid
,
int
level
);
void
tsdbInitFileGroupIter
(
STsdbFileH
*
pFileH
,
SFileGroupIter
*
pIter
,
int
direction
);
void
tsdbSeekFileGroupIter
(
SFileGroupIter
*
pIter
,
int
fid
);
SFileGroup
*
tsdbGetFileGroupNext
(
SFileGroupIter
*
pIter
);
int
tsdbOpenFile
(
SFile
*
pFile
,
int
oflag
);
void
tsdbCloseFile
(
SFile
*
pFile
);
int
tsdbCreateFile
(
SFile
*
pFile
,
STsdbRepo
*
pRepo
,
int
fid
,
int
type
);
SFileGroup
*
tsdbSearchFGroup
(
STsdbFileH
*
pFileH
,
int
fid
,
int
flags
);
int
tsdbGetFidLevel
(
int
fid
,
SFidGroup
fidg
);
void
tsdbRemoveFilesBeyondRetention
(
STsdbRepo
*
pRepo
,
SFidGroup
*
pFidGroup
);
int
tsdbUpdateFileHeader
(
SFile
*
pFile
);
int
tsdbEncodeSFileInfo
(
void
**
buf
,
const
STsdbFileInfo
*
pInfo
);
void
*
tsdbDecodeSFileInfo
(
void
*
buf
,
STsdbFileInfo
*
pInfo
);
void
tsdbRemoveFileGroup
(
STsdbRepo
*
pRepo
,
SFileGroup
*
pFGroup
);
int
tsdbLoadFileHeader
(
SFile
*
pFile
,
uint32_t
*
version
);
void
tsdbGetFileInfoImpl
(
char
*
fname
,
uint32_t
*
magic
,
int64_t
*
size
);
void
tsdbGetFidGroup
(
STsdbCfg
*
pCfg
,
SFidGroup
*
pFidGroup
);
void
tsdbGetFidKeyRange
(
int
daysPerFile
,
int8_t
precision
,
int
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
int
tsdbApplyRetention
(
STsdbRepo
*
pRepo
,
SFidGroup
*
pFidGroup
);
int
tsdbScanFGroup
(
STsdbScanHandle
*
pScanHandle
,
char
*
rootDir
,
int
fid
);
STsdbScanHandle
*
tsdbNewScanHandle
();
void
tsdbSetScanLogStream
(
STsdbScanHandle
*
pScanHandle
,
FILE
*
fLogStream
);
int
tsdbSetAndOpenScanFile
(
STsdbScanHandle
*
pScanHandle
,
char
*
rootDir
,
int
fid
);
int
tsdbScanSBlockIdx
(
STsdbScanHandle
*
pScanHandle
);
int
tsdbScanSBlock
(
STsdbScanHandle
*
pScanHandle
,
int
idx
);
int
tsdbCloseScanFile
(
STsdbScanHandle
*
pScanHandle
);
void
tsdbFreeScanHandle
(
STsdbScanHandle
*
pScanHandle
);
// ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
...
...
@@ -597,31 +619,6 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
}
}
// ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
char
*
tsdbGetMetaFileName
(
char
*
rootDir
);
void
tsdbGetDataFileName
(
char
*
rootDir
,
int
vid
,
int
fid
,
int
type
,
char
*
fname
);
int
tsdbLockRepo
(
STsdbRepo
*
pRepo
);
int
tsdbUnlockRepo
(
STsdbRepo
*
pRepo
);
char
*
tsdbGetDataDirName
(
char
*
rootDir
);
int
tsdbGetNextMaxTables
(
int
tid
);
STsdbMeta
*
tsdbGetMeta
(
TSDB_REPO_T
*
pRepo
);
STsdbFileH
*
tsdbGetFile
(
TSDB_REPO_T
*
pRepo
);
int
tsdbCheckCommit
(
STsdbRepo
*
pRepo
);
// ------------------ tsdbScan.c
int
tsdbScanFGroup
(
STsdbScanHandle
*
pScanHandle
,
char
*
rootDir
,
int
fid
);
STsdbScanHandle
*
tsdbNewScanHandle
();
void
tsdbSetScanLogStream
(
STsdbScanHandle
*
pScanHandle
,
FILE
*
fLogStream
);
int
tsdbSetAndOpenScanFile
(
STsdbScanHandle
*
pScanHandle
,
char
*
rootDir
,
int
fid
);
int
tsdbScanSBlockIdx
(
STsdbScanHandle
*
pScanHandle
);
int
tsdbScanSBlock
(
STsdbScanHandle
*
pScanHandle
,
int
idx
);
int
tsdbCloseScanFile
(
STsdbScanHandle
*
pScanHandle
);
void
tsdbFreeScanHandle
(
STsdbScanHandle
*
pScanHandle
);
// ------------------ tsdbCommitQueue.c
int
tsdbScheduleCommit
(
STsdbRepo
*
pRepo
);
...
...
src/tsdb/src/tsdbCommit.c
浏览文件 @
7d42764f
...
...
@@ -23,6 +23,7 @@ typedef struct {
static
int
tsdbCommitTSData
(
STsdbRepo
*
pRepo
);
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
);
static
int
tsdbStartCommit
(
STsdbRepo
*
pRepo
);
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
,
int
eno
);
static
bool
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitH
*
pch
);
...
...
@@ -33,12 +34,10 @@ static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch);
static
void
tsdbDestroyCommitH
(
SCommitH
*
pch
,
int
niter
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
tsdbInfo
(
"vgId:%d start to commit! keyFirst %"
PRId64
" keyLast %"
PRId64
" numOfRows %"
PRId64
" meta rows: %d"
,
REPO_ID
(
pRepo
),
pMem
->
keyFirst
,
pMem
->
keyLast
,
pMem
->
numOfRows
,
listNEles
(
pMem
->
actList
));
pRepo
->
code
=
TSDB_CODE_SUCCESS
;
if
(
tsdbStartCommit
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to commit data while startting to commit since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
// Commit to update meta file
if
(
tsdbCommitMeta
(
pRepo
)
<
0
)
{
...
...
@@ -52,17 +51,14 @@ void *tsdbCommitData(STsdbRepo *pRepo) {
goto
_err
;
}
tsdbInfo
(
"vgId:%d commit over, succeed"
,
REPO_ID
(
pRepo
));
tsdbEndCommit
(
pRepo
,
TSDB_CODE_SUCCESS
);
return
NULL
;
_err:
ASSERT
(
terrno
!=
TSDB_CODE_SUCCESS
);
pRepo
->
code
=
terrno
;
tsdbInfo
(
"vgId:%d commit over, failed"
,
REPO_ID
(
pRepo
));
tsdbEndCommit
(
pRepo
,
terrno
);
tsdbEndCommit
(
pRepo
,
terrno
);
return
NULL
;
}
...
...
@@ -151,19 +147,38 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
goto
_err
;
}
// TODO
// tsdbUpdateMFile(pRepo, NULL)
return
0
;
_err:
return
-
1
;
}
static
int
tsdbStartCommit
(
STsdbRepo
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
tsdbInfo
(
"vgId:%d start to commit! keyFirst %"
PRId64
" keyLast %"
PRId64
" numOfRows %"
PRId64
" meta rows: %d"
,
REPO_ID
(
pRepo
),
pMem
->
keyFirst
,
pMem
->
keyLast
,
pMem
->
numOfRows
,
listNEles
(
pMem
->
actList
));
// TODO
pRepo
->
code
=
TSDB_CODE_SUCCESS
;
return
0
;
}
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
,
int
eno
)
{
tsdbInfo
(
"vgId:%d commit over, %s"
,
REPO_ID
(
pRepo
),
(
eno
==
TSDB_CODE_SUCCESS
)
?
"succeed"
:
"failed"
);
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_OVER
,
eno
);
SMemTable
*
pIMem
=
pRepo
->
imem
;
tsdbLockRepo
(
pRepo
);
pRepo
->
imem
=
NULL
;
tsdbUnlockRepo
(
pRepo
);
tsdbUnRefMemTable
(
pRepo
,
pIMem
);
sem_post
(
&
(
pRepo
->
readyToCommit
));
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录