Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
387b4d36
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
387b4d36
编写于
8月 21, 2023
作者:
Y
yihaoDeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor checkpoint
上级
689e880e
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
57 addition
and
33 deletion
+57
-33
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+7
-3
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+32
-22
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+18
-8
未找到文件。
source/dnode/vnode/src/tq/tq.c
浏览文件 @
387b4d36
...
...
@@ -1691,7 +1691,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
// todo refactor.
int32_t
vnodeEnqueueStreamMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
STQ
*
pTq
=
pVnode
->
pTq
;
STQ
*
pTq
=
pVnode
->
pTq
;
int32_t
vgId
=
pVnode
->
config
.
vgId
;
SMsgHead
*
msgStr
=
pMsg
->
pCont
;
...
...
@@ -1710,7 +1710,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
tDecoderClear
(
&
decoder
);
int32_t
taskId
=
req
.
taskId
;
tqDebug
(
"vgId:%d receive dispatch msg to s-task:0x%"
PRIx64
"-0x%x"
,
vgId
,
req
.
streamId
,
taskId
);
tqDebug
(
"vgId:%d receive dispatch msg to s-task:0x%"
PRIx64
"-0x%x"
,
vgId
,
req
.
streamId
,
taskId
);
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pTq
->
pStreamMeta
,
req
.
streamId
,
taskId
);
if
(
pTask
!=
NULL
)
{
...
...
@@ -1883,7 +1883,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug
(
"s-task:%s receive task nodeEp update msg from mnode"
,
pTask
->
id
.
idStr
);
streamTaskUpdateEpsetInfo
(
pTask
,
req
.
pNodeList
);
streamMetaSaveTask
(
pMeta
,
pTask
);
{
taosWLockLatch
(
&
pMeta
->
lock
);
streamMetaSaveTask
(
pMeta
,
pTask
);
taosWUnLockLatch
(
&
pMeta
->
lock
);
}
streamTaskStop
(
pTask
);
taosWLockLatch
(
&
pMeta
->
lock
);
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
387b4d36
...
...
@@ -466,16 +466,16 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
taosThreadMutexInit
(
&
pHandle
->
cfMutex
,
NULL
);
pHandle
->
cfInst
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
rocksdb_env_t
*
env
=
rocksdb_create_default_env
();
// rocksdb_envoptions_create();
//
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
int32_t
nBGThread
=
tsNumOfSnodeStreamThreads
<=
2
?
1
:
tsNumOfSnodeStreamThreads
/
2
;
rocksdb_env_set_low_priority_background_threads
(
env
,
nBGThread
);
rocksdb_env_set_high_priority_background_threads
(
env
,
nBGThread
);
//
int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2;
//
rocksdb_env_set_low_priority_background_threads(env, nBGThread);
//
rocksdb_env_set_high_priority_background_threads(env, nBGThread);
rocksdb_cache_t
*
cache
=
rocksdb_cache_create_lru
(
dbMemLimit
/
2
);
rocksdb_options_t
*
opts
=
rocksdb_options_create
();
rocksdb_options_set_env
(
opts
,
env
);
//
rocksdb_options_set_env(opts, env);
rocksdb_options_set_create_if_missing
(
opts
,
1
);
rocksdb_options_set_create_missing_column_families
(
opts
,
1
);
rocksdb_options_set_max_total_wal_size
(
opts
,
dbMemLimit
);
...
...
@@ -485,7 +485,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
rocksdb_options_set_db_write_buffer_size
(
opts
,
dbMemLimit
);
rocksdb_options_set_write_buffer_size
(
opts
,
dbMemLimit
/
2
);
pHandle
->
env
=
env
;
//
pHandle->env = env;
pHandle
->
dbOpt
=
opts
;
pHandle
->
cache
=
cache
;
pHandle
->
filterFactory
=
rocksdb_compactionfilterfactory_create
(
...
...
@@ -520,7 +520,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
_EXIT:
rocksdb_options_destroy
(
opts
);
rocksdb_cache_destroy
(
cache
);
rocksdb_env_destroy
(
env
);
//
rocksdb_env_destroy(env);
taosThreadMutexDestroy
(
&
pHandle
->
mutex
);
taosThreadMutexDestroy
(
&
pHandle
->
cfMutex
);
taosHashCleanup
(
pHandle
->
cfInst
);
...
...
@@ -556,7 +556,7 @@ void streamBackendCleanup(void* arg) {
rocksdb_close
(
pHandle
->
db
);
}
rocksdb_options_destroy
(
pHandle
->
dbOpt
);
rocksdb_env_destroy
(
pHandle
->
env
);
//
rocksdb_env_destroy(pHandle->env);
rocksdb_cache_destroy
(
pHandle
->
cache
);
SListNode
*
head
=
tdListPopHead
(
pHandle
->
list
);
...
...
@@ -570,15 +570,18 @@ void streamBackendCleanup(void* arg) {
taosThreadMutexDestroy
(
&
pHandle
->
mutex
);
taosThreadMutexDestroy
(
&
pHandle
->
cfMutex
);
qDebug
(
"destroy stream backend
backend
:%p"
,
pHandle
);
qDebug
(
"destroy stream backend :%p"
,
pHandle
);
taosMemoryFree
(
pHandle
);
return
;
}
void
streamBackendHandleCleanup
(
void
*
arg
)
{
SBackendCfWrapper
*
wrapper
=
arg
;
bool
remove
=
wrapper
->
remove
;
taosThreadRwlockWrlock
(
&
wrapper
->
rwLock
);
qDebug
(
"start to do-close backendwrapper %p, %s"
,
wrapper
,
wrapper
->
idstr
);
if
(
wrapper
->
rocksdb
==
NULL
)
{
taosThreadRwlockUnlock
(
&
wrapper
->
rwLock
);
return
;
}
...
...
@@ -589,7 +592,7 @@ void streamBackendHandleCleanup(void* arg) {
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
wrapper
->
pHandle
[
i
]
!=
NULL
)
rocksdb_drop_column_family
(
wrapper
->
rocksdb
,
wrapper
->
pHandle
[
i
],
&
err
);
if
(
err
!=
NULL
)
{
// qError("failed to create
cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
qError
(
"failed to drop
cf:%s_%s, reason:%s"
,
wrapper
->
idstr
,
ginitDict
[
i
].
key
,
err
);
taosMemoryFreeClear
(
err
);
}
}
...
...
@@ -600,7 +603,7 @@ void streamBackendHandleCleanup(void* arg) {
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
wrapper
->
pHandle
[
i
]
!=
NULL
)
rocksdb_flush_cf
(
wrapper
->
rocksdb
,
flushOpt
,
wrapper
->
pHandle
[
i
],
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to
create
cf:%s_%s, reason:%s"
,
wrapper
->
idstr
,
ginitDict
[
i
].
key
,
err
);
qError
(
"failed to
flush
cf:%s_%s, reason:%s"
,
wrapper
->
idstr
,
ginitDict
[
i
].
key
,
err
);
taosMemoryFreeClear
(
err
);
}
}
...
...
@@ -628,6 +631,7 @@ void streamBackendHandleCleanup(void* arg) {
wrapper
->
readOpts
=
NULL
;
taosMemoryFreeClear
(
wrapper
->
cfOpts
);
taosMemoryFreeClear
(
wrapper
->
param
);
taosThreadRwlockUnlock
(
&
wrapper
->
rwLock
);
taosThreadRwlockDestroy
(
&
wrapper
->
rwLock
);
wrapper
->
rocksdb
=
NULL
;
...
...
@@ -783,6 +787,11 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
for
(
int
i
=
0
;
i
<
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
i
++
)
{
if
(
wrapper
->
pHandle
[
i
])
{
rocksdb_column_family_handle_t
*
p
=
wrapper
->
pHandle
[
i
];
size_t
len
=
0
;
char
*
name
=
rocksdb_column_family_handle_get_name
(
p
,
&
len
);
qError
(
"column name: name: %d"
,
(
int
)
len
);
taosMemoryFree
(
name
);
taosArrayPush
(
pHandle
,
&
p
);
}
}
...
...
@@ -887,10 +896,11 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
}
// Get all cf and acquire cfWrappter
int32_t
nCf
=
chkpGetAllDbCfHandle
(
pMeta
,
&
ppCf
,
refs
);
qDebug
(
"stream backend:%p start to do checkpoint at:%s, cf num: %d "
,
pHandle
,
pChkpIdDir
,
nCf
);
int32_t
nCf
=
0
;
//
chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
qDebug
(
"stream backend:%p start to do checkpoint at:%s, cf num: %d "
,
pHandle
,
pChkpIdDir
,
0
);
code
=
chkpPreFlushDb
(
pHandle
->
db
,
ppCf
,
nCf
);
// code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
code
=
0
;
if
(
code
==
0
)
{
code
=
chkpDoDbCheckpoint
(
pHandle
->
db
,
pChkpIdDir
);
if
(
code
!=
0
)
{
...
...
@@ -903,10 +913,10 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
qError
(
"stream backend:%p failed to flush db at:%s"
,
pHandle
,
pChkpIdDir
);
}
// release all ref to cfWrapper;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
refs
);
i
++
)
{
int64_t
id
=
*
(
int64_t
*
)
taosArrayGet
(
refs
,
i
);
taosReleaseRef
(
streamBackendCfWrapperId
,
id
);
}
//
for (int i = 0; i < taosArrayGetSize(refs); i++) {
//
int64_t id = *(int64_t*)taosArrayGet(refs, i);
//
taosReleaseRef(streamBackendCfWrapperId, id);
//
}
if
(
code
==
0
)
{
taosWLockLatch
(
&
pMeta
->
chkpDirLock
);
taosArrayPush
(
pMeta
->
chkpSaved
,
&
checkpointId
);
...
...
@@ -1431,8 +1441,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
}
// close default cf
if
(((
rocksdb_column_family_handle_t
**
)
cfHandle
)[
0
]
!=
0
)
{
rocksdb_column_family_handle_destroy
(
cfHandle
[
0
]);
cfHandle
[
0
]
=
NULL
;
//
rocksdb_column_family_handle_destroy(cfHandle[0]);
//
cfHandle[0] = NULL;
}
rocksdb_options_destroy
(
cfOpts
[
0
]);
...
...
@@ -1464,7 +1474,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst
->
pCompares
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
*
));
inst
->
dbOpt
=
handle
->
dbOpt
;
rocksdb_writeoptions_disable_WAL
(
inst
->
wOpt
,
1
);
//
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
taosHashPut
(
handle
->
cfInst
,
idstr
,
strlen
(
idstr
)
+
1
,
&
inst
,
sizeof
(
void
*
));
}
else
{
inst
=
*
pInst
;
...
...
@@ -1585,7 +1595,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
taosThreadRwlockInit
(
&
pBackendCfWrapper
->
rwLock
,
NULL
);
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
pBackendCfWrapper
->
pComparNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
rocksdb_writeoptions_disable_WAL
(
pBackendCfWrapper
->
writeOpts
,
1
);
//
rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
memcpy
(
pBackendCfWrapper
->
idstr
,
pState
->
pTdbState
->
idstr
,
sizeof
(
pState
->
pTdbState
->
idstr
));
int64_t
id
=
taosAddRef
(
streamBackendCfWrapperId
,
pBackendCfWrapper
);
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
387b4d36
...
...
@@ -25,8 +25,8 @@ int32_t streamBackendId = 0;
int32_t
streamBackendCfWrapperId
=
0
;
static
int64_t
streamGetLatestCheckpointId
(
SStreamMeta
*
pMeta
);
static
void
metaHbToMnode
(
void
*
param
,
void
*
tmrId
);
static
void
streamMetaClear
(
SStreamMeta
*
pMeta
);
static
void
metaHbToMnode
(
void
*
param
,
void
*
tmrId
);
static
void
streamMetaClear
(
SStreamMeta
*
pMeta
);
static
void
streamMetaEnvInit
()
{
streamBackendId
=
taosOpenRef
(
64
,
streamBackendCleanup
);
...
...
@@ -100,9 +100,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta
->
chkpId
=
chkpId
;
pMeta
->
streamBackend
=
streamBackendInit
(
pMeta
->
path
,
chkpId
);
if
(
pMeta
->
streamBackend
==
NULL
)
{
goto
_err
;
while
(
pMeta
->
streamBackend
==
NULL
)
{
taosMsleep
(
2
*
1000
);
pMeta
->
streamBackend
=
streamBackendInit
(
pMeta
->
path
,
pMeta
->
chkpId
);
if
(
pMeta
->
streamBackend
==
NULL
)
{
qError
(
"vgId:%d failed to init stream backend"
,
pMeta
->
vgId
);
}
}
// if (pMeta->streamBackend == NULL) {
// goto _err;
// }
pMeta
->
streamBackendRid
=
taosAddRef
(
streamBackendId
,
pMeta
->
streamBackend
);
code
=
streamBackendLoadCheckpointInfo
(
pMeta
);
...
...
@@ -158,9 +165,13 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
}
pMeta
->
streamBackend
=
streamBackendInit
(
pMeta
->
path
,
pMeta
->
chkpId
);
if
(
pMeta
->
streamBackend
==
NULL
)
{
qError
(
"vgId:%d failed to init stream backend"
,
pMeta
->
vgId
);
return
-
1
;
while
(
pMeta
->
streamBackend
==
NULL
)
{
taosMsleep
(
2
*
1000
);
pMeta
->
streamBackend
=
streamBackendInit
(
pMeta
->
path
,
pMeta
->
chkpId
);
if
(
pMeta
->
streamBackend
==
NULL
)
{
qError
(
"vgId:%d failed to init stream backend"
,
pMeta
->
vgId
);
// return -1;
}
}
pMeta
->
streamBackendRid
=
taosAddRef
(
streamBackendId
,
pMeta
->
streamBackend
);
...
...
@@ -523,7 +534,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
}
else
{
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
taosMemoryFree
(
pTask
);
continue
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录