Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
34d21bc0
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
34d21bc0
编写于
5月 26, 2023
作者:
Y
yihaoDeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
change parameter
上级
52c384c7
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
128 addition
and
135 deletion
+128
-135
cmake/cmake.define
cmake/cmake.define
+1
-1
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+2
-1
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+125
-133
未找到文件。
cmake/cmake.define
浏览文件 @
34d21bc0
cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE O
N
)
set(CMAKE_VERBOSE_MAKEFILE O
FF
)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory
...
...
include/libs/stream/streamState.h
浏览文件 @
34d21bc0
...
...
@@ -44,9 +44,10 @@ typedef struct STdbState {
void
*
param
;
void
*
env
;
SListNode
*
pComparNode
;
void
*
pBackend
Handle
;
void
*
pBackend
;
char
idstr
[
64
];
void
*
compactFactory
;
TdThreadRwlock
rwLock
;
TDB
*
db
;
TTB
*
pStateDb
;
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
34d21bc0
...
...
@@ -22,6 +22,9 @@ typedef struct SCompactFilteFactory {
void
*
status
;
}
SCompactFilteFactory
;
typedef
struct
{
void
*
tableOpt
;
}
RocksdbCfParam
;
typedef
struct
{
rocksdb_t
*
db
;
rocksdb_column_family_handle_t
**
pHandle
;
...
...
@@ -29,12 +32,13 @@ typedef struct {
rocksdb_readoptions_t
*
rOpt
;
rocksdb_options_t
**
cfOpt
;
rocksdb_options_t
*
dbOpt
;
void
*
param
;
void
*
pBackend
Handle
;
RocksdbCfParam
*
param
;
void
*
pBackend
;
SListNode
*
pCompareNode
;
rocksdb_comparator_t
**
pCompares
;
}
RocksdbCfInst
;
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
SHashObj
*
ids
);
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
);
void
destroyRocksdbCfInst
(
RocksdbCfInst
*
inst
);
...
...
@@ -46,9 +50,6 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c
char
**
newval
,
size_t
*
newvlen
,
unsigned
char
*
value_changed
);
rocksdb_compactionfilter_t
*
compactFilteFactoryCreateFilter
(
void
*
arg
,
rocksdb_compactionfiltercontext_t
*
ctx
);
typedef
struct
{
void
*
tableOpt
;
}
RocksdbCfParam
;
const
char
*
cfName
[]
=
{
"default"
,
"state"
,
"fill"
,
"sess"
,
"func"
,
"parname"
,
"partag"
};
typedef
int
(
*
EncodeFunc
)(
void
*
key
,
char
*
buf
);
...
...
@@ -114,25 +115,7 @@ void* streamBackendInit(const char* path) {
/*
list all cf and get prefix
*/
int64_t
streamId
;
int32_t
taskId
,
dummpy
=
0
;
SHashObj
*
tbl
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
for
(
size_t
i
=
0
;
i
<
nCf
;
i
++
)
{
char
*
cf
=
cfs
[
i
];
char
suffix
[
64
]
=
{
0
};
if
(
3
==
sscanf
(
cf
,
"0x%"
PRIx64
"-%d_%s"
,
&
streamId
,
&
taskId
,
suffix
))
{
char
idstr
[
128
]
=
{
0
};
sprintf
(
idstr
,
"0x%"
PRIx64
"-%d"
,
streamId
,
taskId
);
// qError("make cf name %s", idstr);
if
(
taosHashGet
(
tbl
,
idstr
,
strlen
(
idstr
)
+
1
)
==
NULL
)
{
taosHashPut
(
tbl
,
idstr
,
strlen
(
idstr
)
+
1
,
&
dummpy
,
sizeof
(
dummpy
));
}
}
else
{
continue
;
}
}
streamStateOpenBackendCf
(
pHandle
,
(
char
*
)
path
,
tbl
);
taosHashCleanup
(
tbl
);
streamStateOpenBackendCf
(
pHandle
,
(
char
*
)
path
,
cfs
,
nCf
);
}
rocksdb_list_column_families_destroy
(
cfs
,
nCf
);
...
...
@@ -209,7 +192,7 @@ void streamBackendDelCompare(void* backend, void* arg) {
}
void
streamStateDestroy_rocksdb
(
SStreamState
*
pState
,
bool
remove
)
{
streamStateCloseBackend
(
pState
,
remove
);
}
static
bool
streamStateIterSeekAndValid
(
rocksdb_iterator_t
*
iter
,
char
*
buf
,
size_t
len
);
int
streamGetInit
(
const
char
*
funcName
);
int
streamGetInit
(
SStreamState
*
pState
,
const
char
*
funcName
);
// |key|-----value------|
// |key|ttl|len|userData|
...
...
@@ -679,37 +662,25 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
taosMemoryFree
(
inst
);
}
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
SHashObj
*
ids
)
{
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
)
{
SBackendHandle
*
handle
=
backend
;
char
*
err
=
NULL
;
size_t
nSize
=
taosHashGetSize
(
ids
);
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
int64_t
streamId
;
int32_t
taskId
,
dummy
=
0
;
char
suffix
[
64
]
=
{
0
};
SHashObj
*
instTbl
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
char
**
cfNames
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
char
*
));
void
*
pIter
=
taosHashIterate
(
ids
,
NULL
);
size_t
keyLen
=
0
;
char
*
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
cfNames
[
i
]
=
(
char
*
)
taosMemoryCalloc
(
1
,
128
);
if
(
i
==
0
)
{
memcpy
(
cfNames
[
0
],
"default"
,
strlen
(
"default"
));
continue
;
}
rocksdb_options_t
**
cfOpts
=
taosMemoryCalloc
(
nCf
,
sizeof
(
rocksdb_options_t
*
));
RocksdbCfParam
*
params
=
taosMemoryCalloc
(
nCf
,
sizeof
(
RocksdbCfParam
*
));
rocksdb_comparator_t
**
pCompare
=
taosMemoryCalloc
(
nCf
,
sizeof
(
rocksdb_comparator_t
**
));
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryCalloc
(
nCf
,
sizeof
(
rocksdb_column_family_handle_t
*
));
GEN_COLUMN_FAMILY_NAME
(
cfNames
[
i
],
idstr
,
ginitDict
[(
i
-
1
)
%
(
cfLen
)].
key
);
if
(
i
%
cfLen
==
0
)
{
pIter
=
taosHashIterate
(
ids
,
pIter
);
if
(
pIter
!=
NULL
)
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
}
}
rocksdb_options_t
**
cfOpts
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_options_t
*
));
RocksdbCfParam
*
params
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
RocksdbCfParam
*
));
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
for
(
int
i
=
0
;
i
<
nCf
;
i
++
)
{
char
*
cf
=
cfs
[
i
];
char
funcname
[
64
]
=
{
0
};
cfOpts
[
i
]
=
rocksdb_options_create_copy
(
handle
->
dbOpt
);
if
(
i
==
0
)
{
continue
;
}
// refactor later
if
(
i
==
0
)
continue
;
if
(
3
==
sscanf
(
cf
,
"0x%"
PRIx64
"-%d_%s"
,
&
streamId
,
&
taskId
,
funcname
))
{
rocksdb_block_based_table_options_t
*
tableOpt
=
rocksdb_block_based_options_create
();
rocksdb_block_based_options_set_block_cache
(
tableOpt
,
handle
->
cache
);
...
...
@@ -718,22 +689,17 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
rocksdb_options_set_block_based_table_factory
((
rocksdb_options_t
*
)
cfOpts
[
i
],
tableOpt
);
params
[
i
].
tableOpt
=
tableOpt
;
};
rocksdb_comparator_t
**
pCompare
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_comparator_t
**
));
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
if
(
i
==
0
)
{
continue
;
}
SCfInit
*
cf
=
&
ginitDict
[(
i
-
1
)
%
cfLen
];
int
idx
=
streamGetInit
(
NULL
,
funcname
);
SCfInit
*
cfPara
=
&
ginitDict
[
idx
];
rocksdb_comparator_t
*
compare
=
rocksdb_comparator_create
(
NULL
,
cf
->
detroyFunc
,
cf
->
cmpFunc
,
cf
->
cmpName
);
rocksdb_comparator_t
*
compare
=
rocksdb_comparator_create
(
NULL
,
cfPara
->
detroyFunc
,
cfPara
->
cmpFunc
,
cfPara
->
cmpName
);
rocksdb_options_set_comparator
((
rocksdb_options_t
*
)
cfOpts
[
i
],
compare
);
pCompare
[
i
]
=
compare
;
}
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_column_family_handle_t
*
));
rocksdb_t
*
db
=
rocksdb_open_column_families
(
handle
->
dbOpt
,
name
,
nSize
*
cfLen
+
1
,
(
const
char
*
const
*
)
cfNames
,
}
rocksdb_t
*
db
=
rocksdb_open_column_families
(
handle
->
dbOpt
,
name
,
nCf
,
(
const
char
*
const
*
)
cfs
,
(
const
rocksdb_options_t
*
const
*
)
cfOpts
,
cfHandle
,
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to open rocksdb cf, reason:%s"
,
err
);
...
...
@@ -742,50 +708,53 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
qDebug
(
"succ to open rocksdb cf, reason"
);
}
pIter
=
taosHashIterate
(
ids
,
NULL
);
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
for
(
int
i
=
0
;
i
<
nSize
;
i
++
)
{
RocksdbCfInst
*
inst
=
taosMemoryCalloc
(
1
,
sizeof
(
RocksdbCfInst
));
rocksdb_column_family_handle_t
**
subCf
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_column_family_handle_t
*
));
rocksdb_comparator_t
**
subCompare
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
*
));
RocksdbCfParam
*
subParam
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
RocksdbCfParam
));
rocksdb_options_t
**
subOpt
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_options_t
*
));
for
(
int
j
=
0
;
j
<
cfLen
;
j
++
)
{
subCf
[
j
]
=
cfHandle
[
i
*
cfLen
+
j
+
1
];
subCompare
[
j
]
=
pCompare
[
i
*
cfLen
+
j
+
1
];
subParam
[
j
]
=
params
[
i
*
cfLen
+
j
+
1
];
subOpt
[
j
]
=
cfOpts
[
i
*
cfLen
+
j
+
1
];
}
inst
->
db
=
db
;
inst
->
pHandle
=
subCf
;
static
int32_t
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
for
(
int
i
=
0
;
i
<
nCf
;
i
++
)
{
char
*
cf
=
cfs
[
i
];
if
(
i
==
0
)
continue
;
char
funcname
[
64
]
=
{
0
};
if
(
3
==
sscanf
(
cf
,
"0x%"
PRIx64
"-%d_%s"
,
&
streamId
,
&
taskId
,
funcname
))
{
char
idstr
[
128
]
=
{
0
};
sprintf
(
idstr
,
"0x%"
PRIx64
"-%d"
,
streamId
,
taskId
);
int
idx
=
streamGetInit
(
NULL
,
funcname
);
RocksdbCfInst
*
inst
=
NULL
;
RocksdbCfInst
**
pInst
=
taosHashGet
(
instTbl
,
idstr
,
strlen
(
idstr
)
+
1
);
if
(
pInst
==
NULL
||
*
pInst
==
NULL
)
{
inst
=
taosMemoryCalloc
(
1
,
sizeof
(
RocksdbCfInst
));
inst
->
pHandle
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_column_family_handle_t
*
));
inst
->
cfOpt
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_options_t
*
));
inst
->
wOpt
=
rocksdb_writeoptions_create
();
inst
->
rOpt
=
rocksdb_readoptions_create
();
inst
->
cfOpt
=
(
rocksdb_options_t
**
)
subOpt
;
inst
->
param
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
RocksdbCfParam
));
inst
->
pBackend
=
handle
;
inst
->
db
=
db
;
inst
->
pCompares
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
*
));
inst
->
dbOpt
=
handle
->
dbOpt
;
inst
->
param
=
subParam
;
inst
->
pBackendHandle
=
handle
;
handle
->
db
=
db
;
SCfComparator
compare
=
{.
comp
=
subCompare
,
.
numOfComp
=
cfLen
};
inst
->
pCompareNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
rocksdb_writeoptions_disable_WAL
(
inst
->
wOpt
,
1
);
taosHashPut
(
handle
->
cfInst
,
idstr
,
keyLen
,
&
inst
,
sizeof
(
void
*
));
pIter
=
taosHashIterate
(
ids
,
pIter
);
if
(
pIter
!=
NULL
)
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
taosHashPut
(
handle
->
cfInst
,
idstr
,
strlen
(
idstr
)
+
1
,
&
inst
,
sizeof
(
void
*
));
}
else
{
inst
=
*
pInst
;
}
rocksdb_column_family_handle_destroy
(
cfHandle
[
0
]);
rocksdb_options_destroy
(
cfOpts
[
0
]);
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
taosMemoryFree
(
cfNames
[
i
]);
inst
->
cfOpt
[
idx
]
=
cfOpts
[
i
];
inst
->
pCompares
[
idx
]
=
pCompare
[
i
];
memcpy
(
&
(
inst
->
param
[
idx
]),
&
(
params
[
i
]),
sizeof
(
RocksdbCfParam
));
inst
->
pHandle
[
idx
]
=
cfHandle
[
i
];
}
}
void
**
pIter
=
taosHashIterate
(
handle
->
cfInst
,
NULL
);
while
(
*
pIter
)
{
RocksdbCfInst
*
inst
=
*
pIter
;
SCfComparator
compare
=
{.
comp
=
inst
->
pCompares
,
.
numOfComp
=
cfLen
};
inst
->
pCompareNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
}
taosMemoryFree
(
cfNames
);
taosMemoryFree
(
cfHandle
);
taosMemoryFree
(
pCompare
);
taosMemoryFree
(
params
);
taosMemoryFree
(
cfOpts
);
return
0
;
}
int
streamStateOpenBackend
(
void
*
backend
,
SStreamState
*
pState
)
{
...
...
@@ -804,12 +773,13 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
pState
->
pTdbState
->
cfOpts
=
inst
->
cfOpt
;
pState
->
pTdbState
->
dbOpt
=
handle
->
dbOpt
;
pState
->
pTdbState
->
param
=
inst
->
param
;
pState
->
pTdbState
->
pBackend
Handle
=
handle
;
pState
->
pTdbState
->
pBackend
=
handle
;
pState
->
pTdbState
->
pComparNode
=
inst
->
pCompareNode
;
taosThreadMutexUnlock
(
&
handle
->
cfMutex
);
return
0
;
}
taosThreadMutexUnlock
(
&
handle
->
cfMutex
);
return
0
;
char
*
err
=
NULL
;
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
...
...
@@ -839,15 +809,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
pCompare
[
i
]
=
compare
;
}
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryMalloc
(
cfLen
*
sizeof
(
rocksdb_column_family_handle_t
*
));
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
char
buf
[
128
]
=
{
0
};
GEN_COLUMN_FAMILY_NAME
(
buf
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
i
].
key
);
cfHandle
[
i
]
=
rocksdb_create_column_family
(
handle
->
db
,
cfOpt
[
i
],
buf
,
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to create cf:%s_%s, reason:%s"
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
i
].
key
,
err
);
taosMemoryFreeClear
(
err
);
}
}
pState
->
pTdbState
->
rocksdb
=
handle
->
db
;
pState
->
pTdbState
->
pHandle
=
cfHandle
;
pState
->
pTdbState
->
writeOpts
=
rocksdb_writeoptions_create
();
...
...
@@ -855,8 +816,9 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
pState
->
pTdbState
->
cfOpts
=
(
rocksdb_options_t
**
)
cfOpt
;
pState
->
pTdbState
->
dbOpt
=
handle
->
dbOpt
;
pState
->
pTdbState
->
param
=
param
;
pState
->
pTdbState
->
pBackend
Handle
=
handle
;
pState
->
pTdbState
->
pBackend
=
handle
;
taosThreadRwlockInit
(
&
pState
->
pTdbState
->
rwLock
,
NULL
);
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
pState
->
pTdbState
->
pComparNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
...
...
@@ -865,7 +827,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
}
void
streamStateCloseBackend
(
SStreamState
*
pState
,
bool
remove
)
{
SBackendHandle
*
pHandle
=
pState
->
pTdbState
->
pBackend
Handle
;
SBackendHandle
*
pHandle
=
pState
->
pTdbState
->
pBackend
;
taosThreadMutexLock
(
&
pHandle
->
cfMutex
);
RocksdbCfInst
**
ppInst
=
taosHashGet
(
pHandle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
if
(
ppInst
!=
NULL
&&
*
ppInst
!=
NULL
)
{
...
...
@@ -887,6 +849,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
char
*
err
=
NULL
;
if
(
remove
)
{
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
pState
->
pTdbState
->
pHandle
[
i
]
!=
NULL
)
rocksdb_drop_column_family
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
pHandle
[
i
],
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to create cf:%s_%s, reason:%s"
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
i
].
key
,
err
);
...
...
@@ -896,6 +859,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
}
else
{
rocksdb_flushoptions_t
*
flushOpt
=
rocksdb_flushoptions_create
();
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
pState
->
pTdbState
->
pHandle
[
i
]
!=
NULL
)
rocksdb_flush_cf
(
pState
->
pTdbState
->
rocksdb
,
flushOpt
,
pState
->
pTdbState
->
pHandle
[
i
],
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to create cf:%s_%s, reason:%s"
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
i
].
key
,
err
);
...
...
@@ -906,8 +870,10 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
}
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
pState
->
pTdbState
->
pHandle
[
i
]
!=
NULL
)
{
rocksdb_column_family_handle_destroy
(
pState
->
pTdbState
->
pHandle
[
i
]);
}
}
taosMemoryFreeClear
(
pState
->
pTdbState
->
pHandle
);
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
rocksdb_options_destroy
(
pState
->
pTdbState
->
cfOpts
[
i
]);
...
...
@@ -915,7 +881,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
}
if
(
remove
)
{
streamBackendDelCompare
(
pState
->
pTdbState
->
pBackend
Handle
,
pState
->
pTdbState
->
pComparNode
);
streamBackendDelCompare
(
pState
->
pTdbState
->
pBackend
,
pState
->
pTdbState
->
pComparNode
);
}
rocksdb_writeoptions_destroy
(
pState
->
pTdbState
->
writeOpts
);
pState
->
pTdbState
->
writeOpts
=
NULL
;
...
...
@@ -924,6 +890,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
pState
->
pTdbState
->
readOpts
=
NULL
;
taosMemoryFreeClear
(
pState
->
pTdbState
->
cfOpts
);
taosMemoryFreeClear
(
pState
->
pTdbState
->
param
);
taosThreadRwlockDestroy
(
&
pState
->
pTdbState
->
rwLock
);
pState
->
pTdbState
->
rocksdb
=
NULL
;
}
void
streamStateDestroyCompar
(
void
*
arg
)
{
...
...
@@ -934,14 +902,38 @@ void streamStateDestroyCompar(void* arg) {
taosMemoryFree
(
comp
->
comp
);
}
int
streamGetInit
(
const
char
*
funcName
)
{
int
streamGetInit
(
SStreamState
*
pState
,
const
char
*
funcName
)
{
int
idx
=
-
1
;
size_t
len
=
strlen
(
funcName
);
for
(
int
i
=
0
;
i
<
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
i
++
)
{
if
(
len
==
ginitDict
[
i
].
len
&&
strncmp
(
funcName
,
ginitDict
[
i
].
key
,
strlen
(
funcName
))
==
0
)
{
return
i
;
idx
=
i
;
break
;
}
}
return
-
1
;
if
(
idx
!=
-
1
)
{
rocksdb_column_family_handle_t
*
cf
=
NULL
;
taosThreadRwlockRdlock
(
&
pState
->
pTdbState
->
rwLock
);
cf
=
pState
->
pTdbState
->
pHandle
[
idx
];
taosThreadRwlockUnlock
(
&
pState
->
pTdbState
->
rwLock
);
if
(
cf
==
NULL
)
{
char
buf
[
128
]
=
{
0
};
GEN_COLUMN_FAMILY_NAME
(
buf
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
idx
].
key
);
char
*
err
=
NULL
;
cf
=
rocksdb_create_column_family
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
cfOpts
[
idx
],
buf
,
&
err
);
if
(
err
!=
NULL
)
{
idx
=
-
1
;
qError
(
"failed to to open cf, %p 0x%"
PRIx64
"-%d_%s, reason:%s"
,
pState
,
pState
->
streamId
,
pState
->
taskId
,
funcName
,
err
);
taosMemoryFree
(
err
);
}
taosThreadRwlockWrlock
(
&
pState
->
pTdbState
->
rwLock
);
pState
->
pTdbState
->
pHandle
[
idx
]
=
cf
;
taosThreadRwlockUnlock
(
&
pState
->
pTdbState
->
rwLock
);
}
}
return
idx
;
}
bool
streamStateIterSeekAndValid
(
rocksdb_iterator_t
*
iter
,
char
*
buf
,
size_t
len
)
{
rocksdb_iter_seek
(
iter
,
buf
,
len
);
...
...
@@ -955,7 +947,7 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
}
rocksdb_iterator_t
*
streamStateIterCreate
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_snapshot_t
**
snapshot
,
rocksdb_readoptions_t
**
readOpt
)
{
int
idx
=
streamGetInit
(
cfName
);
int
idx
=
streamGetInit
(
pState
,
cfName
);
if
(
snapshot
!=
NULL
)
{
*
snapshot
=
(
rocksdb_snapshot_t
*
)
rocksdb_create_snapshot
(
pState
->
pTdbState
->
rocksdb
);
...
...
@@ -974,7 +966,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(
funcname);
\
int i = streamGetInit(
pState, funcname);
\
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
...
...
@@ -1004,7 +996,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(
funcname);
\
int i = streamGetInit(
pState, funcname);
\
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
...
...
@@ -1051,7 +1043,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(
funcname);
\
int i = streamGetInit(
pState, funcname);
\
if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
...
...
@@ -1979,7 +1971,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
void
streamStateDestroyBatch
(
void
*
pBatch
)
{
rocksdb_writebatch_destroy
((
rocksdb_writebatch_t
*
)
pBatch
);
}
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
void
*
val
,
int32_t
vlen
,
int64_t
ttl
)
{
int
i
=
streamGetInit
(
cfName
);
int
i
=
streamGetInit
(
pState
,
cfName
);
if
(
i
<
0
)
{
qError
(
"streamState failed to put to cf name:%s"
,
cfName
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录