Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
36eef2fd
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
36eef2fd
编写于
4月 13, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor code
上级
259815fd
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
86 addition
and
29 deletion
+86
-29
source/libs/stream/inc/streamBackendRocksdb.h
source/libs/stream/inc/streamBackendRocksdb.h
+9
-1
source/libs/stream/src/streamStateRocksdb.c
source/libs/stream/src/streamStateRocksdb.c
+77
-28
未找到文件。
source/libs/stream/inc/streamBackendRocksdb.h
浏览文件 @
36eef2fd
...
...
@@ -89,5 +89,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
int32_t
streamDefaultPut_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
,
void
*
pVal
,
int32_t
pVLen
);
int32_t
streamDefaultGet_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamDefaultDel_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
);
int32_t
streamDefaultIter_rocksdb
(
SStreamState
*
pState
,
const
void
*
start
,
const
void
*
end
,
SArray
*
result
);
void
*
streamDefaultIterCreate_rocksdb
(
SStreamState
*
pState
);
int32_t
streamDefaultIterValid_rocksdb
(
void
*
iter
);
void
*
streamDefaultIterSeek_rocksdb
(
void
*
iter
,
const
char
*
key
);
int32_t
streamDefaultIter_rocksdb
(
void
*
iter
);
char
**
streamDefaultIterKey_rocksdb
(
void
*
iter
);
char
*
streamDefaultIterVal_rocksdb
(
void
*
iter
);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif
\ No newline at end of file
source/libs/stream/src/streamStateRocksdb.c
浏览文件 @
36eef2fd
...
...
@@ -646,37 +646,86 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
return
code
;
}
void
*
streamDefaultIterCreate_rocksdb
(
SStreamState
*
pState
)
{}
int32_t
streamDefaultIterValid_rocksdb
(
void
*
iter
);
void
*
streamDefaultIterSeek_rocksdb
(
void
*
iter
,
const
char
*
key
);
int32_t
streamDefaultIter_rocksdb
(
void
*
iter
);
char
**
streamDefaultIterKey_rocksdb
(
void
*
iter
);
char
*
streamDefaultIterVal_rocksdb
(
void
*
iter
);
// typedef struct {
// char* start;
// char* end;
// void* result;
// } StreamFilterArg;
// typedef int (*streamfilterFunc)(StreamFilterArg* arg);
// int32_t streamDefaultIterFilter_rocksdb(SStreamState* pState, streamfilterFunc filterFunc, StreamFilterArg* arg) {
// int code = 0;
// char* err = NULL;
// rocksdb_snapshot_t* snapshot = NULL;
// rocksdb_readoptions_t* readopts = NULL;
// rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
// if (pIter == NULL) {
// return -1;
// }
// char* start = arg->start;
// char* end = arg->end;
// SArray* result = arg->result;
// rocksdb_iter_seek(pIter, start, strlen(start));
// while (rocksdb_iter_valid(pIter)) {
// const char* key = rocksdb_iter_key(pIter, NULL);
// if (end != NULL && strcmp(key, end) > 0) {
// break;
// }
// if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
// int64_t checkPoint = 0;
// // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
// // taosArrayPush(result, &checkPoint);
// // }
// } else {
// break;
// }
// rocksdb_iter_next(pIter);
// }
// rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
// rocksdb_readoptions_destroy(readopts);
// rocksdb_iter_destroy(pIter);
// return code;
// }
int32_t
streamDefaultIter_rocksdb
(
SStreamState
*
pState
,
const
void
*
start
,
const
void
*
end
,
SArray
*
result
)
{
int
code
=
0
;
char
*
err
=
NULL
;
//
int code = 0;
//
char* err = NULL;
rocksdb_snapshot_t
*
snapshot
=
NULL
;
rocksdb_readoptions_t
*
readopts
=
NULL
;
rocksdb_iterator_t
*
pIter
=
streamStateIterCreate
(
pState
,
"default"
,
&
snapshot
,
&
readopts
);
if
(
pIter
==
NULL
)
{
return
-
1
;
}
//
rocksdb_snapshot_t* snapshot = NULL;
//
rocksdb_readoptions_t* readopts = NULL;
//
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
//
if (pIter == NULL) {
//
return -1;
//
}
rocksdb_iter_seek
(
pIter
,
start
,
strlen
(
start
));
while
(
rocksdb_iter_valid
(
pIter
))
{
const
char
*
key
=
rocksdb_iter_key
(
pIter
,
NULL
);
if
(
end
!=
NULL
&&
strcmp
(
key
,
end
)
>
0
)
{
break
;
}
if
(
strncmp
(
key
,
start
,
strlen
(
start
))
==
0
&&
strlen
(
key
)
>=
strlen
(
start
)
+
1
)
{
int64_t
checkPoint
=
0
;
if
(
sscanf
(
key
+
strlen
(
key
),
":%"
PRId64
""
,
&
checkPoint
)
==
1
)
{
taosArrayPush
(
result
,
&
checkPoint
);
}
}
else
{
break
;
}
rocksdb_iter_next
(
pIter
);
}
rocksdb_release_snapshot
(
pState
->
pTdbState
->
rocksdb
,
snapshot
);
rocksdb_readoptions_destroy
(
readopts
);
rocksdb_iter_destroy
(
pIter
);
return
code
;
//
rocksdb_iter_seek(pIter, start, strlen(start));
//
while (rocksdb_iter_valid(pIter)) {
//
const char* key = rocksdb_iter_key(pIter, NULL);
//
if (end != NULL && strcmp(key, end) > 0) {
//
break;
//
}
//
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
//
int64_t checkPoint = 0;
//
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
//
taosArrayPush(result, &checkPoint);
//
}
//
} else {
//
break;
//
}
//
rocksdb_iter_next(pIter);
//
}
//
rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
//
rocksdb_readoptions_destroy(readopts);
//
rocksdb_iter_destroy(pIter);
//
return code;
}
int32_t
streamStateGet_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录