Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3e944d22
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3e944d22
编写于
8月 21, 2023
作者:
Y
yihaoDeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor checkpoint
上级
873c22ab
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
27 addition
and
18 deletion
+27
-18
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+27
-18
未找到文件。
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
3e944d22
...
...
@@ -466,16 +466,16 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
taosThreadMutexInit
(
&
pHandle
->
cfMutex
,
NULL
);
pHandle
->
cfInst
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
//
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
rocksdb_env_t
*
env
=
rocksdb_create_default_env
();
// rocksdb_envoptions_create();
//
int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2;
//
rocksdb_env_set_low_priority_background_threads(env, nBGThread);
//
rocksdb_env_set_high_priority_background_threads(env, nBGThread);
int32_t
nBGThread
=
tsNumOfSnodeStreamThreads
<=
2
?
1
:
tsNumOfSnodeStreamThreads
/
2
;
rocksdb_env_set_low_priority_background_threads
(
env
,
nBGThread
);
rocksdb_env_set_high_priority_background_threads
(
env
,
nBGThread
);
rocksdb_cache_t
*
cache
=
rocksdb_cache_create_lru
(
dbMemLimit
/
2
);
rocksdb_options_t
*
opts
=
rocksdb_options_create
();
//
rocksdb_options_set_env(opts, env);
rocksdb_options_set_env
(
opts
,
env
);
rocksdb_options_set_create_if_missing
(
opts
,
1
);
rocksdb_options_set_create_missing_column_families
(
opts
,
1
);
rocksdb_options_set_max_total_wal_size
(
opts
,
dbMemLimit
);
...
...
@@ -484,8 +484,9 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
rocksdb_options_set_info_log_level
(
opts
,
1
);
rocksdb_options_set_db_write_buffer_size
(
opts
,
dbMemLimit
);
rocksdb_options_set_write_buffer_size
(
opts
,
dbMemLimit
/
2
);
rocksdb_options_set_atomic_flush
(
opts
,
1
);
//
pHandle->env = env;
pHandle
->
env
=
env
;
pHandle
->
dbOpt
=
opts
;
pHandle
->
cache
=
cache
;
pHandle
->
filterFactory
=
rocksdb_compactionfilterfactory_create
(
...
...
@@ -520,7 +521,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
_EXIT:
rocksdb_options_destroy
(
opts
);
rocksdb_cache_destroy
(
cache
);
//
rocksdb_env_destroy(env);
rocksdb_env_destroy
(
env
);
taosThreadMutexDestroy
(
&
pHandle
->
mutex
);
taosThreadMutexDestroy
(
&
pHandle
->
cfMutex
);
taosHashCleanup
(
pHandle
->
cfInst
);
...
...
@@ -556,7 +557,7 @@ void streamBackendCleanup(void* arg) {
rocksdb_close
(
pHandle
->
db
);
}
rocksdb_options_destroy
(
pHandle
->
dbOpt
);
//
rocksdb_env_destroy(pHandle->env);
rocksdb_env_destroy
(
pHandle
->
env
);
rocksdb_cache_destroy
(
pHandle
->
cache
);
SListNode
*
head
=
tdListPopHead
(
pHandle
->
list
);
...
...
@@ -789,7 +790,9 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
rocksdb_column_family_handle_t
*
p
=
wrapper
->
pHandle
[
i
];
size_t
len
=
0
;
char
*
name
=
rocksdb_column_family_handle_get_name
(
p
,
&
len
);
qError
(
"column name: name: %d"
,
(
int
)
len
);
char
buf
[
64
]
=
{
0
};
memcpy
(
buf
,
name
,
len
);
qError
(
"column name: name: %s, len: %d"
,
buf
,
(
int
)
len
);
taosMemoryFree
(
name
);
taosArrayPush
(
pHandle
,
&
p
);
...
...
@@ -834,8 +837,9 @@ _ERROR:
return
code
;
}
int32_t
chkpPreFlushDb
(
rocksdb_t
*
db
,
rocksdb_column_family_handle_t
**
cf
,
int32_t
nCf
)
{
int
code
=
-
1
;
int
code
=
0
;
char
*
err
=
NULL
;
rocksdb_flushoptions_t
*
flushOpt
=
rocksdb_flushoptions_create
();
rocksdb_flushoptions_set_wait
(
flushOpt
,
1
);
...
...
@@ -843,8 +847,8 @@ int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32
if
(
err
!=
NULL
)
{
qError
(
"failed to flush db before streamBackend clean up, reason:%s"
,
err
);
taosMemoryFree
(
err
);
code
=
-
1
;
}
code
=
0
;
rocksdb_flushoptions_destroy
(
flushOpt
);
return
code
;
}
...
...
@@ -896,11 +900,10 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
}
// Get all cf and acquire cfWrappter
int32_t
nCf
=
0
;
//
chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
int32_t
nCf
=
chkpGetAllDbCfHandle
(
pMeta
,
&
ppCf
,
refs
);
qDebug
(
"stream backend:%p start to do checkpoint at:%s, cf num: %d "
,
pHandle
,
pChkpIdDir
,
0
);
// code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
code
=
0
;
code
=
chkpPreFlushDb
(
pHandle
->
db
,
ppCf
,
nCf
);
if
(
code
==
0
)
{
code
=
chkpDoDbCheckpoint
(
pHandle
->
db
,
pChkpIdDir
);
if
(
code
!=
0
)
{
...
...
@@ -964,7 +967,13 @@ static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const cha
rocksdb_snapshot_t
**
snapshot
,
rocksdb_readoptions_t
**
readOpt
);
int
defaultKeyComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
)
{
int
ret
=
memcmp
(
aBuf
,
bBuf
,
aLen
);
int
ret
=
0
;
// qError("alen: %d, blen:%d", (int)aLen, (int)bLen);
if
(
aLen
<
bLen
)
{
ret
=
memcmp
(
aBuf
,
bBuf
,
aLen
);
}
else
{
ret
=
memcmp
(
aBuf
,
bBuf
,
bLen
);
}
if
(
ret
==
0
)
{
if
(
aLen
<
bLen
)
return
-
1
;
...
...
@@ -1474,7 +1483,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst
->
pCompares
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
*
));
inst
->
dbOpt
=
handle
->
dbOpt
;
//
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
rocksdb_writeoptions_disable_WAL
(
inst
->
wOpt
,
1
);
taosHashPut
(
handle
->
cfInst
,
idstr
,
strlen
(
idstr
)
+
1
,
&
inst
,
sizeof
(
void
*
));
}
else
{
inst
=
*
pInst
;
...
...
@@ -1595,7 +1604,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
taosThreadRwlockInit
(
&
pBackendCfWrapper
->
rwLock
,
NULL
);
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
pBackendCfWrapper
->
pComparNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
//
rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
rocksdb_writeoptions_disable_WAL
(
pBackendCfWrapper
->
writeOpts
,
1
);
memcpy
(
pBackendCfWrapper
->
idstr
,
pState
->
pTdbState
->
idstr
,
sizeof
(
pState
->
pTdbState
->
idstr
));
int64_t
id
=
taosAddRef
(
streamBackendCfWrapperId
,
pBackendCfWrapper
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录