Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0d11430b
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0d11430b
编写于
3月 28, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
tqadd backend
上级
86fe2169
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
37 addition
and
27 deletion
+37
-27
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+4
-1
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+3
-0
source/libs/stream/src/streamStateRocksdb.c
source/libs/stream/src/streamStateRocksdb.c
+30
-26
未找到文件。
include/libs/stream/streamState.h
浏览文件 @
0d11430b
...
@@ -65,7 +65,10 @@ int32_t streamStateAbort(SStreamState* pState);
...
@@ -65,7 +65,10 @@ int32_t streamStateAbort(SStreamState* pState);
void
streamStateDestroy
(
SStreamState
*
pState
);
void
streamStateDestroy
(
SStreamState
*
pState
);
typedef
struct
{
typedef
struct
{
rocksdb_iterator_t
*
iter
;
rocksdb_iterator_t
*
iter
;
rocksdb_snapshot_t
*
snapshot
;
rocksdb_readoptions_t
*
readOpt
;
rocksdb_t
*
db
;
TBC
*
pCur
;
TBC
*
pCur
;
int64_t
number
;
int64_t
number
;
...
...
source/libs/stream/src/streamState.c
浏览文件 @
0d11430b
...
@@ -669,6 +669,9 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
...
@@ -669,6 +669,9 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
}
}
qDebug
(
"streamStateFreeCur"
);
qDebug
(
"streamStateFreeCur"
);
rocksdb_iter_destroy
(
pCur
->
iter
);
rocksdb_iter_destroy
(
pCur
->
iter
);
rocksdb_release_snapshot
(
pCur
->
db
,
pCur
->
snapshot
);
rocksdb_readoptions_destroy
(
pCur
->
readOpt
);
tdbTbcClose
(
pCur
->
pCur
);
tdbTbcClose
(
pCur
->
pCur
);
taosMemoryFree
(
pCur
);
taosMemoryFree
(
pCur
);
}
}
...
...
source/libs/stream/src/streamStateRocksdb.c
浏览文件 @
0d11430b
...
@@ -13,7 +13,6 @@
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "query.h"
#include "rocksdb/c.h"
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "streamBackendRocksdb.h"
#include "tcommon.h"
#include "tcommon.h"
...
@@ -302,7 +301,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
...
@@ -302,7 +301,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
// create the DB if it's not already present
// create the DB if it's not already present
rocksdb_options_set_create_if_missing
(
opts
,
1
);
rocksdb_options_set_create_if_missing
(
opts
,
1
);
rocksdb_options_set_create_missing_column_families
(
opts
,
1
);
rocksdb_options_set_create_missing_column_families
(
opts
,
1
);
rocksdb_options_set_write_buffer_size
(
opts
,
8
<<
20
);
rocksdb_options_set_write_buffer_size
(
opts
,
64
<<
20
);
char
*
err
=
NULL
;
char
*
err
=
NULL
;
int
cfLen
=
sizeof
(
cfName
)
/
sizeof
(
cfName
[
0
]);
int
cfLen
=
sizeof
(
cfName
)
/
sizeof
(
cfName
[
0
]);
...
@@ -314,7 +313,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
...
@@ -314,7 +313,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_cache_t
*
cache
=
rocksdb_cache_create_lru
(
128
<<
20
);
rocksdb_cache_t
*
cache
=
rocksdb_cache_create_lru
(
128
<<
20
);
rocksdb_block_based_options_set_block_cache
(
tableOpt
,
cache
);
rocksdb_block_based_options_set_block_cache
(
tableOpt
,
cache
);
rocksdb_filterpolicy_t
*
filter
=
rocksdb_filterpolicy_create_bloom_full
(
1
0
);
rocksdb_filterpolicy_t
*
filter
=
rocksdb_filterpolicy_create_bloom_full
(
1
5
);
rocksdb_block_based_options_set_filter_policy
(
tableOpt
,
filter
);
rocksdb_block_based_options_set_filter_policy
(
tableOpt
,
filter
);
rocksdb_options_set_block_based_table_factory
((
rocksdb_options_t
*
)
cfOpt
[
i
],
tableOpt
);
rocksdb_options_set_block_based_table_factory
((
rocksdb_options_t
*
)
cfOpt
[
i
],
tableOpt
);
...
@@ -408,10 +407,17 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
...
@@ -408,10 +407,17 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
valid
=
true
;
valid
=
true
;
return
valid
;
return
valid
;
}
}
rocksdb_iterator_t
*
streamStateIterCreate
(
SStreamState
*
pState
,
const
char
*
cfName
)
{
rocksdb_iterator_t
*
streamStateIterCreate
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_snapshot_t
**
snapshot
,
rocksdb_readoptions_t
**
readOpt
)
{
int
idx
=
streamGetInit
(
cfName
);
int
idx
=
streamGetInit
(
cfName
);
return
rocksdb_create_iterator_cf
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
readOpts
,
pState
->
pTdbState
->
pHandle
[
idx
]);
*
snapshot
=
(
rocksdb_snapshot_t
*
)
rocksdb_create_snapshot
(
pState
->
pTdbState
->
rocksdb
);
*
readOpt
=
(
rocksdb_readoptions_t
*
)
rocksdb_readoptions_create
();
rocksdb_readoptions_set_snapshot
(
pState
->
pTdbState
->
readOpts
,
*
snapshot
);
rocksdb_readoptions_set_fill_cache
(
pState
->
pTdbState
->
readOpts
,
0
);
return
rocksdb_create_iterator_cf
(
pState
->
pTdbState
->
rocksdb
,
*
readOpt
,
pState
->
pTdbState
->
pHandle
[
idx
]);
}
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
...
@@ -617,8 +623,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
...
@@ -617,8 +623,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
return
NULL
;
return
NULL
;
}
}
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
);
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
...
@@ -649,11 +655,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
...
@@ -649,11 +655,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
// pCur->iter =
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[2]);
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
);
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
...
@@ -684,7 +687,8 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
...
@@ -684,7 +687,8 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
);
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
...
@@ -725,14 +729,13 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
...
@@ -725,14 +729,13 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
if
(
pCur
==
NULL
)
return
NULL
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"default"
);
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"default"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int
len
=
stateKeyEncode
((
void
*
)
&
sKey
,
buf
);
int
len
=
stateKeyEncode
((
void
*
)
&
sKey
,
buf
);
// char sKeyStr[128] = {0};
// stateKeyToString(&sKey, sKeyStr);
rocksdb_iter_seek
(
pCur
->
iter
,
buf
,
len
);
rocksdb_iter_seek
(
pCur
->
iter
,
buf
,
len
);
if
(
rocksdb_iter_valid
(
pCur
->
iter
))
{
if
(
rocksdb_iter_valid
(
pCur
->
iter
))
{
SStateKey
curKey
;
SStateKey
curKey
;
...
@@ -740,10 +743,6 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
...
@@ -740,10 +743,6 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
char
*
keyStr
=
(
char
*
)
rocksdb_iter_key
(
pCur
->
iter
,
&
kLen
);
char
*
keyStr
=
(
char
*
)
rocksdb_iter_key
(
pCur
->
iter
,
&
kLen
);
stateKeyDecode
((
void
*
)
&
curKey
,
keyStr
);
stateKeyDecode
((
void
*
)
&
curKey
,
keyStr
);
// char tKeyStr[128] = {0};
// stateKeyToString(&curKey, tKeyStr);
// qWarn("streamStateGetCur_rocksdb-->src:%s, dst:%s", sKeyStr, tKeyStr);
if
(
stateKeyCmpr
(
&
sKey
,
sizeof
(
sKey
),
&
curKey
,
sizeof
(
curKey
))
==
0
)
{
if
(
stateKeyCmpr
(
&
sKey
,
sizeof
(
sKey
),
&
curKey
,
sizeof
(
curKey
))
==
0
)
{
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
return
pCur
;
return
pCur
;
...
@@ -787,7 +786,8 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
...
@@ -787,7 +786,8 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
if
(
pCur
==
NULL
)
return
NULL
;
if
(
pCur
==
NULL
)
return
NULL
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
);
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
...
@@ -898,7 +898,8 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
...
@@ -898,7 +898,8 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
return
NULL
;
return
NULL
;
}
}
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"default"
);
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"default"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
...
@@ -929,7 +930,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
...
@@ -929,7 +930,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
return
NULL
;
return
NULL
;
}
}
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
);
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
...
@@ -959,7 +961,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
...
@@ -959,7 +961,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
return
NULL
;
return
NULL
;
}
}
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
);
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
...
@@ -1003,7 +1006,8 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
...
@@ -1003,7 +1006,8 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
return
-
1
;
return
-
1
;
}
}
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
);
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
int32_t
c
=
0
;
int32_t
c
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录