Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
af7f78ad
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
af7f78ad
编写于
7月 13, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
上级
2f90a535
2ec67bcc
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
13 addition
and
4 deletion
+13
-4
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+13
-4
未找到文件。
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
af7f78ad
...
@@ -1053,7 +1053,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
...
@@ -1053,7 +1053,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst
->
pCompares
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
*
));
inst
->
pCompares
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
*
));
inst
->
dbOpt
=
handle
->
dbOpt
;
inst
->
dbOpt
=
handle
->
dbOpt
;
//rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
//
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
taosHashPut
(
handle
->
cfInst
,
idstr
,
strlen
(
idstr
)
+
1
,
&
inst
,
sizeof
(
void
*
));
taosHashPut
(
handle
->
cfInst
,
idstr
,
strlen
(
idstr
)
+
1
,
&
inst
,
sizeof
(
void
*
));
}
else
{
}
else
{
inst
=
*
pInst
;
inst
=
*
pInst
;
...
@@ -1174,7 +1174,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
...
@@ -1174,7 +1174,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
taosThreadRwlockInit
(
&
pBackendCfWrapper
->
rwLock
,
NULL
);
taosThreadRwlockInit
(
&
pBackendCfWrapper
->
rwLock
,
NULL
);
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
pBackendCfWrapper
->
pComparNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
pBackendCfWrapper
->
pComparNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
//rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
//
rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
memcpy
(
pBackendCfWrapper
->
idstr
,
pState
->
pTdbState
->
idstr
,
sizeof
(
pState
->
pTdbState
->
idstr
));
memcpy
(
pBackendCfWrapper
->
idstr
,
pState
->
pTdbState
->
idstr
,
sizeof
(
pState
->
pTdbState
->
idstr
));
int64_t
id
=
taosAddRef
(
streamBackendCfWrapperId
,
pBackendCfWrapper
);
int64_t
id
=
taosAddRef
(
streamBackendCfWrapperId
,
pBackendCfWrapper
);
...
@@ -1438,6 +1438,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
...
@@ -1438,6 +1438,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
SWinKey
tmp
=
{.
ts
=
0
,
.
groupId
=
0
};
SWinKey
tmp
=
{.
ts
=
0
,
.
groupId
=
0
};
streamStatePut_rocksdb
(
pState
,
&
tmp
,
NULL
,
0
);
streamStatePut_rocksdb
(
pState
,
&
tmp
,
NULL
,
0
);
SStreamStateCur
*
pCur
=
streamStateSeekKeyNext_rocksdb
(
pState
,
&
tmp
);
SStreamStateCur
*
pCur
=
streamStateSeekKeyNext_rocksdb
(
pState
,
&
tmp
);
int32_t
code
=
streamStateGetKVByCur_rocksdb
(
pCur
,
key
,
NULL
,
0
);
int32_t
code
=
streamStateGetKVByCur_rocksdb
(
pCur
,
key
,
NULL
,
0
);
streamStateFreeCur
(
pCur
);
streamStateFreeCur
(
pCur
);
streamStateDel_rocksdb
(
pState
,
&
tmp
);
streamStateDel_rocksdb
(
pState
,
&
tmp
);
...
@@ -1553,6 +1554,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
...
@@ -1553,6 +1554,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
const
SStateKey
maxStateKey
=
{.
key
=
{.
groupId
=
UINT64_MAX
,
.
ts
=
INT64_MAX
},
.
opNum
=
INT64_MAX
};
const
SStateKey
maxStateKey
=
{.
key
=
{.
groupId
=
UINT64_MAX
,
.
ts
=
INT64_MAX
},
.
opNum
=
INT64_MAX
};
STREAM_STATE_PUT_ROCKSDB
(
pState
,
"state"
,
&
maxStateKey
,
""
,
0
);
STREAM_STATE_PUT_ROCKSDB
(
pState
,
"state"
,
&
maxStateKey
,
""
,
0
);
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int32_t
klen
=
stateKeyEncode
((
void
*
)
&
maxStateKey
,
buf
);
int32_t
klen
=
stateKeyEncode
((
void
*
)
&
maxStateKey
,
buf
);
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
...
@@ -1562,6 +1564,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
...
@@ -1562,6 +1564,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
rocksdb_iter_seek
(
pCur
->
iter
,
buf
,
(
size_t
)
klen
);
rocksdb_iter_seek
(
pCur
->
iter
,
buf
,
(
size_t
)
klen
);
rocksdb_iter_prev
(
pCur
->
iter
);
rocksdb_iter_prev
(
pCur
->
iter
);
...
@@ -1587,6 +1591,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
...
@@ -1587,6 +1591,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
...
@@ -1855,6 +1860,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
...
@@ -1855,6 +1860,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
...
@@ -1916,6 +1922,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
...
@@ -1916,6 +1922,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
...
@@ -1953,6 +1960,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
...
@@ -1953,6 +1960,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
int
len
=
winKeyEncode
((
void
*
)
key
,
buf
);
...
@@ -1986,10 +1994,10 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
...
@@ -1986,10 +1994,10 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
pCur
->
number
=
pState
->
number
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
int32_t
c
=
0
;
int32_t
c
=
0
;
...
@@ -2263,6 +2271,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
...
@@ -2263,6 +2271,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"default"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"default"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
return
pCur
;
return
pCur
;
}
}
int32_t
streamDefaultIterValid_rocksdb
(
void
*
iter
)
{
int32_t
streamDefaultIterValid_rocksdb
(
void
*
iter
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录