Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
036f37c9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
036f37c9
编写于
5月 14, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
5月 14, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #5969 from lichuang/feature/TD-3963
[TD-3963]tsdbRepo config hot change
上级
dec7e951
d17e4e56
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
204 addition
and
28 deletion
+204
-28
src/tsdb/inc/tsdbBuffer.h
src/tsdb/inc/tsdbBuffer.h
+4
-1
src/tsdb/inc/tsdbint.h
src/tsdb/inc/tsdbint.h
+5
-0
src/tsdb/src/tsdbBuffer.c
src/tsdb/src/tsdbBuffer.c
+44
-1
src/tsdb/src/tsdbCommitQueue.c
src/tsdb/src/tsdbCommitQueue.c
+33
-0
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+72
-0
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+26
-8
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+20
-18
未找到文件。
src/tsdb/inc/tsdbBuffer.h
浏览文件 @
036f37c9
...
...
@@ -28,6 +28,7 @@ typedef struct {
int
bufBlockSize
;
int
tBufBlocks
;
int
nBufBlocks
;
int
nRecycleBlocks
;
int64_t
index
;
SList
*
bufBlockList
;
}
STsdbBufPool
;
...
...
@@ -39,5 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool);
int
tsdbOpenBufPool
(
STsdbRepo
*
pRepo
);
void
tsdbCloseBufPool
(
STsdbRepo
*
pRepo
);
SListNode
*
tsdbAllocBufBlockFromPool
(
STsdbRepo
*
pRepo
);
int
tsdbExpendPool
(
STsdbRepo
*
pRepo
,
int32_t
oldTotalBlocks
);
void
tsdbRecycleBufferBlock
(
STsdbBufPool
*
pPool
,
SListNode
*
pNode
);
#endif
/* _TD_TSDB_BUFFER_H_ */
\ No newline at end of file
src/tsdb/inc/tsdbint.h
浏览文件 @
036f37c9
...
...
@@ -71,6 +71,11 @@ struct STsdbRepo {
uint8_t
state
;
STsdbCfg
config
;
STsdbCfg
save_config
;
// save apply config
bool
config_changed
;
// config changed flag
pthread_mutex_t
save_mutex
;
// protect save config
STsdbAppH
appH
;
STsdbStat
stat
;
STsdbMeta
*
tsdbMeta
;
...
...
src/tsdb/src/tsdbBuffer.c
浏览文件 @
036f37c9
...
...
@@ -70,6 +70,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
pPool
->
tBufBlocks
=
pCfg
->
totalBlocks
;
pPool
->
nBufBlocks
=
0
;
pPool
->
index
=
0
;
pPool
->
nRecycleBlocks
=
0
;
for
(
int
i
=
0
;
i
<
pCfg
->
totalBlocks
;
i
++
)
{
STsdbBufBlock
*
pBufBlock
=
tsdbNewBufBlock
(
pPool
->
bufBlockSize
);
...
...
@@ -157,3 +158,45 @@ _err:
}
static
void
tsdbFreeBufBlock
(
STsdbBufBlock
*
pBufBlock
)
{
tfree
(
pBufBlock
);
}
int
tsdbExpendPool
(
STsdbRepo
*
pRepo
,
int32_t
oldTotalBlocks
)
{
if
(
oldTotalBlocks
==
pRepo
->
config
.
totalBlocks
)
{
return
TSDB_CODE_SUCCESS
;
}
int
err
=
TSDB_CODE_SUCCESS
;
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
terrno
;
STsdbBufPool
*
pPool
=
pRepo
->
pPool
;
if
(
pRepo
->
config
.
totalBlocks
>
oldTotalBlocks
)
{
for
(
int
i
=
0
;
i
<
pRepo
->
config
.
totalBlocks
-
oldTotalBlocks
;
i
++
)
{
STsdbBufBlock
*
pBufBlock
=
tsdbNewBufBlock
(
pPool
->
bufBlockSize
);
if
(
pBufBlock
==
NULL
)
goto
err
;
if
(
tdListAppend
(
pPool
->
bufBlockList
,
(
void
*
)(
&
pBufBlock
))
<
0
)
{
tsdbFreeBufBlock
(
pBufBlock
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
err
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
err
;
}
pPool
->
nBufBlocks
++
;
}
pthread_cond_signal
(
&
pPool
->
poolNotEmpty
);
}
else
{
pPool
->
nRecycleBlocks
=
oldTotalBlocks
-
pRepo
->
config
.
totalBlocks
;
}
err:
tsdbUnlockRepo
(
pRepo
);
return
err
;
}
void
tsdbRecycleBufferBlock
(
STsdbBufPool
*
pPool
,
SListNode
*
pNode
)
{
STsdbBufBlock
*
pBufBlock
=
NULL
;
tdListNodeGetData
(
pPool
->
bufBlockList
,
pNode
,
(
void
*
)(
&
pBufBlock
));
tsdbFreeBufBlock
(
pBufBlock
);
free
(
pNode
);
pPool
->
nBufBlocks
--
;
}
\ No newline at end of file
src/tsdb/src/tsdbCommitQueue.c
浏览文件 @
036f37c9
...
...
@@ -112,6 +112,32 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) {
return
0
;
}
static
void
tsdbApplyRepoConfig
(
STsdbRepo
*
pRepo
)
{
pRepo
->
config_changed
=
false
;
STsdbCfg
*
pSaveCfg
=
&
pRepo
->
save_config
;
int32_t
oldTotalBlocks
=
pRepo
->
config
.
totalBlocks
;
pRepo
->
config
.
compression
=
pRepo
->
save_config
.
compression
;
pRepo
->
config
.
keep
=
pRepo
->
save_config
.
keep
;
pRepo
->
config
.
keep1
=
pRepo
->
save_config
.
keep1
;
pRepo
->
config
.
keep2
=
pRepo
->
save_config
.
keep2
;
pRepo
->
config
.
cacheLastRow
=
pRepo
->
save_config
.
cacheLastRow
;
pRepo
->
config
.
totalBlocks
=
pRepo
->
save_config
.
totalBlocks
;
tsdbInfo
(
"vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)"
,
REPO_ID
(
pRepo
),
pSaveCfg
->
compression
,
pSaveCfg
->
keep
,
pSaveCfg
->
keep1
,
pSaveCfg
->
keep2
,
pSaveCfg
->
totalBlocks
,
pSaveCfg
->
cacheLastRow
,
pSaveCfg
->
totalBlocks
);
int
err
=
tsdbExpendPool
(
pRepo
,
oldTotalBlocks
);
if
(
!
TAOS_SUCCEEDED
(
err
))
{
tsdbError
(
"vgId:%d expand pool from %d to %d fail,reason:%s"
,
REPO_ID
(
pRepo
),
oldTotalBlocks
,
pSaveCfg
->
totalBlocks
,
tstrerror
(
err
));
}
}
static
void
*
tsdbLoopCommit
(
void
*
arg
)
{
SCommitQueue
*
pQueue
=
&
tsCommitQueue
;
SListNode
*
pNode
=
NULL
;
...
...
@@ -138,6 +164,13 @@ static void *tsdbLoopCommit(void *arg) {
pRepo
=
((
SCommitReq
*
)
pNode
->
data
)
->
pRepo
;
// check if need to apply new config
if
(
pRepo
->
config_changed
)
{
pthread_mutex_lock
(
&
pRepo
->
save_mutex
);
tsdbApplyRepoConfig
(
pRepo
);
pthread_mutex_unlock
(
&
pRepo
->
save_mutex
);
}
tsdbCommitData
(
pRepo
);
listNodeFree
(
pNode
);
}
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
036f37c9
...
...
@@ -203,6 +203,70 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
int32_t
tsdbConfigRepo
(
STsdbRepo
*
repo
,
STsdbCfg
*
pCfg
)
{
// TODO: think about multithread cases
if
(
tsdbCheckAndSetDefaultCfg
(
pCfg
)
<
0
)
return
-
1
;
STsdbCfg
*
pRCfg
=
&
repo
->
config
;
ASSERT
(
pRCfg
->
tsdbId
==
pCfg
->
tsdbId
);
ASSERT
(
pRCfg
->
cacheBlockSize
==
pCfg
->
cacheBlockSize
);
ASSERT
(
pRCfg
->
daysPerFile
==
pCfg
->
daysPerFile
);
ASSERT
(
pRCfg
->
minRowsPerFileBlock
==
pCfg
->
minRowsPerFileBlock
);
ASSERT
(
pRCfg
->
maxRowsPerFileBlock
==
pCfg
->
maxRowsPerFileBlock
);
ASSERT
(
pRCfg
->
precision
==
pCfg
->
precision
);
bool
configChanged
=
false
;
if
(
pRCfg
->
compression
!=
pCfg
->
compression
)
{
configChanged
=
true
;
}
if
(
pRCfg
->
keep
!=
pCfg
->
keep
)
{
configChanged
=
true
;
}
if
(
pRCfg
->
keep1
!=
pCfg
->
keep1
)
{
configChanged
=
true
;
}
if
(
pRCfg
->
keep2
!=
pCfg
->
keep2
)
{
configChanged
=
true
;
}
if
(
pRCfg
->
cacheLastRow
!=
pCfg
->
cacheLastRow
)
{
configChanged
=
true
;
}
if
(
pRCfg
->
totalBlocks
!=
pCfg
->
totalBlocks
)
{
configChanged
=
true
;
}
if
(
!
configChanged
)
{
tsdbError
(
"vgId:%d no config changed"
,
REPO_ID
(
repo
));
}
int
code
=
pthread_mutex_lock
(
&
repo
->
save_mutex
);
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d failed to lock tsdb save config mutex since %s"
,
REPO_ID
(
repo
),
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
STsdbCfg
*
pSaveCfg
=
&
repo
->
save_config
;
*
pSaveCfg
=
repo
->
config
;
pSaveCfg
->
compression
=
pCfg
->
compression
;
pSaveCfg
->
keep
=
pCfg
->
keep
;
pSaveCfg
->
keep1
=
pCfg
->
keep1
;
pSaveCfg
->
keep2
=
pCfg
->
keep2
;
pSaveCfg
->
cacheLastRow
=
pCfg
->
cacheLastRow
;
pSaveCfg
->
totalBlocks
=
pCfg
->
totalBlocks
;
tsdbInfo
(
"vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)"
,
REPO_ID
(
repo
),
pRCfg
->
compression
,
pRCfg
->
keep
,
pRCfg
->
keep1
,
pRCfg
->
keep2
,
pRCfg
->
cacheLastRow
,
pRCfg
->
totalBlocks
);
tsdbInfo
(
"vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)"
,
REPO_ID
(
repo
),
pSaveCfg
->
compression
,
pSaveCfg
->
keep
,
pSaveCfg
->
keep1
,
pSaveCfg
->
keep2
,
pSaveCfg
->
cacheLastRow
,
pSaveCfg
->
totalBlocks
);
repo
->
config_changed
=
true
;
pthread_mutex_unlock
(
&
repo
->
save_mutex
);
return
0
;
#if 0
STsdbRepo *pRepo = (STsdbRepo *)repo;
...
...
@@ -474,6 +538,14 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return
NULL
;
}
code
=
pthread_mutex_init
(
&
(
pRepo
->
save_mutex
),
NULL
);
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
tsdbFreeRepo
(
pRepo
);
return
NULL
;
}
pRepo
->
config_changed
=
false
;
code
=
tsem_init
(
&
(
pRepo
->
readyToCommit
),
0
,
1
);
if
(
code
!=
0
)
{
code
=
errno
;
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
036f37c9
...
...
@@ -98,10 +98,17 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
STsdbBufPool
*
pBufPool
=
pRepo
->
pPool
;
SListNode
*
pNode
=
NULL
;
bool
recycleBlocks
=
pBufPool
->
nRecycleBlocks
>
0
;
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
-
1
;
while
((
pNode
=
tdListPopHead
(
pMemTable
->
bufBlockList
))
!=
NULL
)
{
if
(
pBufPool
->
nRecycleBlocks
>
0
)
{
tsdbRecycleBufferBlock
(
pBufPool
,
pNode
);
pBufPool
->
nRecycleBlocks
-=
1
;
}
else
{
tdListAppendNode
(
pBufPool
->
bufBlockList
,
pNode
);
}
}
if
(
!
recycleBlocks
)
{
int
code
=
pthread_cond_signal
(
&
pBufPool
->
poolNotEmpty
);
if
(
code
!=
0
)
{
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
...
...
@@ -109,6 +116,8 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
}
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
for
(
int
i
=
0
;
i
<
pMemTable
->
maxTables
;
i
++
)
{
...
...
@@ -958,6 +967,15 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
static
int
tsdbUpdateTableLatestInfo
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SDataRow
row
)
{
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
// if cacheLastRow config has been reset, free the lastRow
if
(
!
pCfg
->
cacheLastRow
&&
pTable
->
lastRow
!=
NULL
)
{
taosTZfree
(
pTable
->
lastRow
);
TSDB_WLOCK_TABLE
(
pTable
);
pTable
->
lastRow
=
NULL
;
pTable
->
lastKey
=
TSKEY_INITIAL_VAL
;
TSDB_WUNLOCK_TABLE
(
pTable
);
}
if
(
tsdbGetTableLastKeyImpl
(
pTable
)
<
dataRowKey
(
row
))
{
if
(
pCfg
->
cacheLastRow
||
pTable
->
lastRow
!=
NULL
)
{
SDataRow
nrow
=
pTable
->
lastRow
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
036f37c9
...
...
@@ -170,9 +170,10 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
vDebug
(
"vgId:%d, tsdbchanged:%d syncchanged:%d while alter vnode"
,
pVnode
->
vgId
,
tsdbCfgChanged
,
syncCfgChanged
);
if
(
/*tsdbCfgChanged || */
syncCfgChanged
)
{
if
(
tsdbCfgChanged
||
syncCfgChanged
)
{
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// dbCfgVersion can be corrected by status msg
if
(
syncCfgChanged
)
{
if
(
!
vnodeSetUpdatingStatus
(
pVnode
))
{
vDebug
(
"vgId:%d, vnode is not ready, do alter operation later"
,
pVnode
->
vgId
);
pVnode
->
dbCfgVersion
=
dbCfgVersion
;
...
...
@@ -191,8 +192,9 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
vnodeSetReadyStatus
(
pVnode
);
return
code
;
}
}
if
(
pVnode
->
tsdb
)
{
if
(
tsdbCfgChanged
&&
pVnode
->
tsdb
)
{
code
=
tsdbConfigRepo
(
pVnode
->
tsdb
,
&
pVnode
->
tsdbCfg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pVnode
->
dbCfgVersion
=
dbCfgVersion
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录