Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8be29dc9
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8be29dc9
编写于
5月 06, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
failed to open rocksdb cf
上级
d7e86aca
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
126 addition
and
55 deletion
+126
-55
source/libs/stream/inc/streamBackendRocksdb.h
source/libs/stream/inc/streamBackendRocksdb.h
+1
-0
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+124
-55
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+1
-0
未找到文件。
source/libs/stream/inc/streamBackendRocksdb.h
浏览文件 @
8be29dc9
...
...
@@ -44,6 +44,7 @@ typedef struct {
SList
*
list
;
TdThreadMutex
cfMutex
;
SHashObj
*
cfInst
;
int64_t
defaultCfInit
;
}
SBackendHandle
;
void
*
streamBackendInit
(
const
char
*
path
);
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
8be29dc9
...
...
@@ -33,7 +33,9 @@ typedef struct {
SListNode
*
pCompareNode
;
}
RocksdbCfInst
;
RocksdbCfInst
*
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
*
idstr
);
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
SHashObj
*
ids
);
void
destroyRocksdbCfInst
(
RocksdbCfInst
*
inst
);
void
destroyCompactFilteFactory
(
void
*
arg
);
void
destroyCompactFilte
(
void
*
arg
);
...
...
@@ -100,7 +102,7 @@ void* streamBackendInit(const char* path) {
size_t
nCf
=
0
;
char
**
cfs
=
rocksdb_list_column_families
(
opts
,
path
,
&
nCf
,
&
err
);
if
(
nCf
==
0
||
err
!=
NULL
)
{
if
(
nCf
==
0
||
nCf
==
1
||
err
!=
NULL
)
{
taosMemoryFreeClear
(
err
);
pHandle
->
db
=
rocksdb_open
(
opts
,
path
,
&
err
);
if
(
err
!=
NULL
)
{
...
...
@@ -117,24 +119,18 @@ void* streamBackendInit(const char* path) {
if
(
3
==
sscanf
(
cf
,
"0x%"
PRIx64
"-%d_%s"
,
&
streamId
,
&
taskId
,
suffix
))
{
char
idstr
[
128
]
=
{
0
};
sprintf
(
idstr
,
"0x%"
PRIx64
"-%d"
,
streamId
,
taskId
);
if
(
taosHashGet
(
tbl
,
idstr
,
strlen
(
idstr
))
!=
NULL
)
{
taosHashPut
(
tbl
,
idstr
,
strlen
(
idstr
),
&
dummpy
,
sizeof
(
dummpy
));
// qError("make cf name %s", idstr);
if
(
taosHashGet
(
tbl
,
idstr
,
strlen
(
idstr
)
+
1
)
==
NULL
)
{
taosHashPut
(
tbl
,
idstr
,
strlen
(
idstr
)
+
1
,
&
dummpy
,
sizeof
(
dummpy
));
}
}
else
{
continue
;
}
}
void
*
pIter
=
taosHashIterate
(
tbl
,
NULL
);
while
(
pIter
!=
NULL
)
{
size_t
keyLen
=
0
;
char
*
key
=
taosHashGetKey
(
pIter
,
&
keyLen
);
RocksdbCfInst
*
inst
=
streamStateOpenBackendCf
(
pHandle
,
(
char
*
)
path
,
key
);
taosHashPut
(
pHandle
->
cfInst
,
key
,
keyLen
,
&
inst
,
sizeof
(
void
*
));
taosHashIterate
(
tbl
,
pIter
);
}
streamStateOpenBackendCf
(
pHandle
,
(
char
*
)
path
,
tbl
);
taosHashCleanup
(
tbl
);
}
rocksdb_list_column_families_destroy
(
cfs
,
nCf
);
return
(
void
*
)
pHandle
;
_EXIT:
...
...
@@ -151,6 +147,13 @@ _EXIT:
}
void
streamBackendCleanup
(
void
*
arg
)
{
SBackendHandle
*
pHandle
=
(
SBackendHandle
*
)
arg
;
RocksdbCfInst
**
pIter
=
(
RocksdbCfInst
**
)
taosHashIterate
(
pHandle
->
cfInst
,
NULL
);
while
(
pIter
!=
NULL
)
{
RocksdbCfInst
*
inst
=
*
pIter
;
destroyRocksdbCfInst
(
inst
);
taosHashIterate
(
pHandle
->
cfInst
,
pIter
);
}
taosHashCleanup
(
pHandle
->
cfInst
);
rocksdb_flushoptions_t
*
flushOpt
=
rocksdb_flushoptions_create
();
char
*
err
=
NULL
;
...
...
@@ -176,7 +179,6 @@ void streamBackendCleanup(void* arg) {
// rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree
(
pHandle
->
list
);
taosThreadMutexDestroy
(
&
pHandle
->
cfMutex
);
taosHashCleanup
(
pHandle
->
cfInst
);
taosMemoryFree
(
pHandle
);
...
...
@@ -669,22 +671,56 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c
return
filter
;
}
RocksdbCfInst
*
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
*
idstr
)
{
// qInfo("start to open backend cf, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
SBackendHandle
*
handle
=
backend
;
char
*
err
=
NULL
;
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
char
**
cfNames
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
char
*
));
void
destroyRocksdbCfInst
(
RocksdbCfInst
*
inst
)
{
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
cfNames
[
i
]
=
taosMemoryCalloc
(
1
,
128
);
GEN_COLUMN_FAMILY_NAME
(
cfNames
[
i
],
idstr
,
ginitDict
[
i
].
key
);
rocksdb_column_family_handle_destroy
(
inst
->
pHandle
[
i
]);
}
RocksdbCfParam
*
param
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
RocksdbCfParam
));
const
rocksdb_options_t
**
cfOpt
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_options_t
*
));
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
cfOpt
[
i
]
=
rocksdb_options_create_copy
(
handle
->
dbOpt
);
rocksdb_writeoptions_destroy
(
inst
->
wOpt
);
inst
->
wOpt
=
NULL
;
rocksdb_readoptions_destroy
(
inst
->
rOpt
);
taosMemoryFree
(
inst
->
cfOpt
);
taosMemoryFree
(
inst
->
param
);
taosMemoryFreeClear
(
inst
->
param
);
taosMemoryFree
(
inst
);
}
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
SHashObj
*
ids
)
{
SBackendHandle
*
handle
=
backend
;
char
*
err
=
NULL
;
size_t
nSize
=
taosHashGetSize
(
ids
);
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
char
**
cfNames
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
char
*
));
void
*
pIter
=
taosHashIterate
(
ids
,
NULL
);
size_t
keyLen
=
0
;
char
*
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
cfNames
[
i
]
=
(
char
*
)
taosMemoryCalloc
(
1
,
128
);
if
(
i
==
0
)
{
memcpy
(
cfNames
[
0
],
"default"
,
strlen
(
"default"
));
continue
;
}
qError
(
"cf name %s"
,
idstr
);
GEN_COLUMN_FAMILY_NAME
(
cfNames
[
i
],
idstr
,
ginitDict
[(
i
-
1
)
%
(
cfLen
)].
key
);
if
(
i
%
cfLen
==
0
)
{
pIter
=
taosHashIterate
(
ids
,
pIter
);
if
(
pIter
!=
NULL
)
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
}
}
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
qError
(
"cf name %s"
,
cfNames
[
i
]);
}
rocksdb_options_t
**
cfOpts
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_options_t
*
));
RocksdbCfParam
*
params
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
RocksdbCfParam
*
));
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
cfOpts
[
i
]
=
rocksdb_options_create_copy
(
handle
->
dbOpt
);
if
(
i
==
0
)
{
continue
;
}
// refactor later
rocksdb_block_based_table_options_t
*
tableOpt
=
rocksdb_block_based_options_create
();
rocksdb_block_based_options_set_block_cache
(
tableOpt
,
handle
->
cache
);
...
...
@@ -692,45 +728,77 @@ RocksdbCfInst* streamStateOpenBackendCf(void* backend, char* name, char* idstr)
rocksdb_filterpolicy_t
*
filter
=
rocksdb_filterpolicy_create_bloom
(
15
);
rocksdb_block_based_options_set_filter_policy
(
tableOpt
,
filter
);
rocksdb_options_set_block_based_table_factory
((
rocksdb_options_t
*
)
cfOpt
[
i
],
tableOpt
);
param
[
i
].
tableOpt
=
tableOpt
;
rocksdb_options_set_block_based_table_factory
((
rocksdb_options_t
*
)
cfOpts
[
i
],
tableOpt
);
params
[
i
].
tableOpt
=
tableOpt
;
};
rocksdb_comparator_t
**
pCompare
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
**
));
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
SCfInit
*
cf
=
&
ginitDict
[
i
];
rocksdb_comparator_t
**
pCompare
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_comparator_t
**
));
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
if
(
i
==
0
)
{
continue
;
}
SCfInit
*
cf
=
&
ginitDict
[(
i
-
1
)
%
cfLen
];
rocksdb_comparator_t
*
compare
=
rocksdb_comparator_create
(
NULL
,
cf
->
detroyFunc
,
cf
->
cmpFunc
,
cf
->
cmpName
);
rocksdb_options_set_comparator
((
rocksdb_options_t
*
)
cfOpt
[
i
],
compare
);
rocksdb_options_set_comparator
((
rocksdb_options_t
*
)
cfOpt
s
[
i
],
compare
);
pCompare
[
i
]
=
compare
;
}
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_column_family_handle_t
*
));
rocksdb_t
*
db
=
rocksdb_open_column_families
(
handle
->
dbOpt
,
name
,
cfLen
,
(
const
char
*
const
*
)
cfNames
,
(
const
rocksdb_options_t
*
const
*
)
cfOpt
,
cfHandle
,
&
err
);
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_column_family_handle_t
*
));
rocksdb_t
*
db
=
rocksdb_open_column_families
(
handle
->
dbOpt
,
name
,
nSize
*
cfLen
+
1
,
(
const
char
*
const
*
)
cfNames
,
(
const
rocksdb_options_t
*
const
*
)
cfOpts
,
cfHandle
,
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to open rocksdb cf, reason:%s"
,
err
);
taosMemoryFree
(
err
);
}
else
{
qDebug
(
"succ to open rocksdb cf, reason:%s"
,
err
);
}
RocksdbCfInst
*
inst
=
taosMemoryCalloc
(
1
,
sizeof
(
RocksdbCfInst
));
inst
->
db
=
db
;
inst
->
pHandle
=
cfHandle
;
inst
->
wOpt
=
rocksdb_writeoptions_create
();
inst
->
rOpt
=
rocksdb_readoptions_create
();
inst
->
cfOpt
=
(
rocksdb_options_t
**
)
cfOpt
;
inst
->
dbOpt
=
handle
->
dbOpt
;
inst
->
param
=
param
;
inst
->
pBackendHandle
=
handle
;
handle
->
db
=
db
;
pIter
=
taosHashIterate
(
ids
,
NULL
);
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
for
(
int
i
=
0
;
i
<
nSize
;
i
++
)
{
RocksdbCfInst
*
inst
=
taosMemoryCalloc
(
1
,
sizeof
(
RocksdbCfInst
));
rocksdb_column_family_handle_t
**
subCf
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_column_family_handle_t
*
));
rocksdb_comparator_t
**
subCompare
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
*
));
RocksdbCfParam
*
subParam
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
RocksdbCfParam
));
rocksdb_options_t
**
subOpt
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_options_t
*
));
for
(
int
j
=
0
;
j
<
cfLen
;
j
++
)
{
subCf
[
j
]
=
cfHandle
[
i
*
cfLen
+
j
+
1
];
subCompare
[
j
]
=
pCompare
[
i
*
cfLen
+
j
+
1
];
subParam
[
j
]
=
params
[
i
*
cfLen
+
j
+
1
];
subOpt
[
j
]
=
cfOpts
[
i
*
cfLen
+
j
+
1
];
}
inst
->
db
=
db
;
inst
->
pHandle
=
subCf
;
inst
->
wOpt
=
rocksdb_writeoptions_create
();
inst
->
rOpt
=
rocksdb_readoptions_create
();
inst
->
cfOpt
=
(
rocksdb_options_t
**
)
subOpt
;
inst
->
dbOpt
=
handle
->
dbOpt
;
inst
->
param
=
subParam
;
inst
->
pBackendHandle
=
handle
;
handle
->
db
=
db
;
SCfComparator
compare
=
{.
comp
=
subCompare
,
.
numOfComp
=
cfLen
};
inst
->
pCompareNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
rocksdb_writeoptions_disable_WAL
(
inst
->
wOpt
,
1
);
taosHashPut
(
handle
->
cfInst
,
idstr
,
keyLen
,
&
inst
,
sizeof
(
void
*
));
pIter
=
taosHashIterate
(
ids
,
pIter
);
if
(
pIter
!=
NULL
)
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
}
rocksdb_column_family_handle_destroy
(
cfHandle
[
0
]);
rocksdb_options_destroy
(
cfOpts
[
0
]);
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
taosMemoryFree
(
cfNames
[
i
]);
}
taosMemoryFree
(
cfNames
);
taosMemoryFree
(
cfHandle
);
taosMemoryFree
(
pCompare
);
taosMemoryFree
(
params
);
taosMemoryFree
(
cfOpts
);
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
inst
->
pCompareNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
rocksdb_writeoptions_disable_WAL
(
inst
->
wOpt
,
1
);
return
inst
;
return
0
;
}
int
streamStateOpenBackend
(
void
*
backend
,
SStreamState
*
pState
)
{
qInfo
(
"start to open backend, %p 0x%"
PRIx64
"-%d"
,
pState
,
pState
->
streamId
,
pState
->
taskId
);
...
...
@@ -738,7 +806,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
sprintf
(
pState
->
pTdbState
->
idstr
,
"0x%"
PRIx64
"-%d"
,
pState
->
streamId
,
pState
->
taskId
);
taosThreadMutexLock
(
&
handle
->
cfMutex
);
RocksdbCfInst
**
ppInst
=
taosHashGet
(
handle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
));
RocksdbCfInst
**
ppInst
=
taosHashGet
(
handle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
if
(
ppInst
!=
NULL
&&
*
ppInst
!=
NULL
)
{
RocksdbCfInst
*
inst
=
*
ppInst
;
pState
->
pTdbState
->
rocksdb
=
inst
->
db
;
...
...
@@ -812,13 +880,14 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
void
streamStateCloseBackend
(
SStreamState
*
pState
,
bool
remove
)
{
SBackendHandle
*
pHandle
=
pState
->
pTdbState
->
pBackendHandle
;
taosThreadMutexLock
(
&
pHandle
->
cfMutex
);
RocksdbCfInst
**
ppInst
=
taosHashGet
(
pHandle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
));
RocksdbCfInst
**
ppInst
=
taosHashGet
(
pHandle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
if
(
ppInst
!=
NULL
&&
*
ppInst
!=
NULL
)
{
RocksdbCfInst
*
inst
=
*
ppInst
;
taosMemoryFree
(
inst
);
taosHashRemove
(
pHandle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
));
taosHashRemove
(
pHandle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
}
taosThreadMutexUnlock
(
&
pHandle
->
cfMutex
);
char
*
status
[]
=
{
"close"
,
"drop"
};
qInfo
(
"start to %s backend, %p, 0x%"
PRIx64
"-%d"
,
status
[
remove
==
false
?
0
:
1
],
pState
,
pState
->
streamId
,
pState
->
taskId
);
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
8be29dc9
...
...
@@ -94,6 +94,7 @@ _err:
if
(
pMeta
->
db
)
tdbClose
(
pMeta
->
db
);
// if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend);
taosMemoryFree
(
pMeta
);
qError
(
"failed to open stream meta"
);
return
NULL
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录